blob: 8721ae7c37f23a1515624bbf8066b462f2625dd9 [file] [log] [blame]
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07003 * Copyright (c) 2013-2014, Regents of the University of California.
4 * All rights reserved.
5 *
6 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
7 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
8 *
9 * This file licensed under New BSD License. See COPYING for detailed information about
10 * ndn-cxx library copyright, permissions, and redistribution restrictions.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080011 */
12
13#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
14#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
15
16#include "../common.hpp"
17
18namespace ndn {
19
20const size_t MAX_LENGTH = 9000;
21
22template<class BaseTransport, class Protocol>
23class StreamTransportImpl
24{
25public:
26 typedef BaseTransport base_transport;
27 typedef Protocol protocol;
28 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
29
30 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
31 : m_transport(transport)
32 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070033 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080034 , m_connectionInProgress(false)
35 , m_connectTimer(ioService)
36 {
37 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070038
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080039 void
40 connectHandler(const boost::system::error_code& error)
41 {
42 m_connectionInProgress = false;
43 m_connectTimer.cancel();
44
45 if (!error)
46 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000047 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080048 m_transport.m_isConnected = true;
49
50 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
51 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
52 bind(&impl::handle_async_send, this, _1, *i));
53
54 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
55 i != m_sendPairQueue.end(); ++i)
56 {
57 std::vector<boost::asio::const_buffer> buffer;
58 buffer.reserve(2);
59 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
60 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
61 m_socket.async_send(buffer,
62 bind(&impl::handle_async_send, this, _1, i->first, i->second));
63 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070064
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080065 m_sendQueue.clear();
66 m_sendPairQueue.clear();
67 }
68 else
69 {
70 // may need to throw exception
71 m_transport.m_isConnected = false;
72 m_transport.close();
73 throw Transport::Error(error, "error while connecting to the forwarder");
74 }
75 }
76
77 void
78 connectTimeoutHandler(const boost::system::error_code& error)
79 {
80 if (error) // e.g., cancelled timer
81 return;
82
83 m_connectionInProgress = false;
84 m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -070085 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080086 m_socket.close();
87 throw Transport::Error(error, "error while connecting to the forwarder");
88 }
89
90 void
91 connect(const typename protocol::endpoint& endpoint)
92 {
93 if (!m_connectionInProgress) {
94 m_connectionInProgress = true;
95
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -070096 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080097 /// @todo Decide whether this number should be configurable
98 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
99 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
100
101 m_socket.open();
102 m_socket.async_connect(endpoint,
103 bind(&impl::connectHandler, this, _1));
104 }
105 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700106
107 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800108 close()
109 {
110 m_connectTimer.cancel();
111 m_socket.close();
112 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700113 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800114 m_sendQueue.clear();
115 m_sendPairQueue.clear();
116 }
117
118 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000119 pause()
120 {
121 if (m_transport.m_isExpectingData)
122 {
123 m_transport.m_isExpectingData = false;
124 m_socket.cancel();
125 }
126 }
127
128 void
129 resume()
130 {
131 if (!m_transport.m_isExpectingData)
132 {
133 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700134 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000135 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
136 bind(&impl::handle_async_receive, this, _1, _2));
137 }
138 }
139
140 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800141 send(const Block& wire)
142 {
143 if (!m_transport.m_isConnected)
144 m_sendQueue.push_back(wire);
145 else
146 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
147 bind(&impl::handle_async_send, this, _1, wire));
148 }
149
150 void
151 send(const Block& header, const Block& payload)
152 {
153 if (!m_transport.m_isConnected)
154 {
155 m_sendPairQueue.push_back(std::make_pair(header, payload));
156 }
157 else
158 {
159 std::vector<boost::asio::const_buffer> buffers;
160 buffers.reserve(2);
161 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
162 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700163
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800164 m_socket.async_send(buffers,
165 bind(&impl::handle_async_send, this, _1, header, payload));
166 }
167 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700168
169 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800170 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
171 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700172 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800173 while(offset < availableSize)
174 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700175 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
176 if (!ok)
177 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800178
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700179 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800180 offset += element.size();
181 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700182 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800183 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700184
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800185 void
186 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
187 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800188 if (error)
189 {
190 if (error == boost::system::errc::operation_canceled) {
191 // async receive has been explicitly cancelled (e.g., socket close)
192 return;
193 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700194
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800195 m_socket.close(); // closing at this point may not be that necessary
196 m_transport.m_isConnected = true;
197 throw Transport::Error(error, "error while receiving data from socket");
198 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700199
200 m_inputBufferSize += bytes_recvd;
201 // do magic
202
203 std::size_t offset = 0;
204 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
205 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800206 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700207 // very bad... should close connection
208 m_socket.close();
209 m_transport.m_isConnected = false;
210 m_transport.m_isExpectingData = false;
211 throw Transport::Error(boost::system::error_code(),
212 "input buffer full, but a valid TLV cannot be decoded");
213 }
214
215 if (offset > 0)
216 {
217 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800218 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700219 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
220 m_inputBuffer);
221 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800222 }
223 else
224 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700225 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800226 }
227 }
228
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700229 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
230 MAX_LENGTH - m_inputBufferSize), 0,
231 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800232 }
233
234 void
235 handle_async_send(const boost::system::error_code& error, const Block& wire)
236 {
237 // pass (needed to keep data block alive during the send)
238 }
239
240 void
241 handle_async_send(const boost::system::error_code& error,
242 const Block& header, const Block& payload)
243 {
244 // pass (needed to keep data blocks alive during the send)
245 }
246
247protected:
248 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700249
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800250 typename protocol::socket m_socket;
251 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700252 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800253
254 std::list< Block > m_sendQueue;
255 std::list< std::pair<Block, Block> > m_sendPairQueue;
256 bool m_connectionInProgress;
257
258 boost::asio::deadline_timer m_connectTimer;
259};
260
261
262template<class BaseTransport, class Protocol>
263class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
264{
265public:
266 typedef BaseTransport base_transport;
267 typedef Protocol protocol;
268 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
269
270 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
271 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
272 {
273 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700274
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800275 void
276 resolveHandler(const boost::system::error_code& error,
277 typename protocol::resolver::iterator endpoint,
278 const shared_ptr<typename protocol::resolver>&)
279 {
280 if (error)
281 {
282 if (error == boost::system::errc::operation_canceled)
283 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700284
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800285 throw Transport::Error(error, "Error during resolution of host or port");
286 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700287
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800288 typename protocol::resolver::iterator end;
289 if (endpoint == end)
290 {
291 this->m_connectionInProgress = false;
292 this->m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700293 this->m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800294 this->m_socket.close();
295 throw Transport::Error(error, "Unable to resolve because host or port");
296 }
297
298 this->m_socket.async_connect(*endpoint,
299 bind(&impl::connectHandler, this, _1));
300 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700301
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800302 void
303 connect(const typename protocol::resolver::query& query)
304 {
305 if (!this->m_connectionInProgress) {
306 this->m_connectionInProgress = true;
307
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700308 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800309 /// @todo Decide whether this number should be configurable
310 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
311 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
312
313 // typename boost::asio::ip::basic_resolver< protocol > resolver;
314 shared_ptr<typename protocol::resolver> resolver =
315 make_shared<typename protocol::resolver>(boost::ref(this->m_socket.get_io_service()));
316
317 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
318 }
319 }
320};
321
322
323} // namespace ndn
324
325#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP