blob: 2ab47523bf6cb45a042b58eff48631258aff7529 [file] [log] [blame]
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013-2014 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
8#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
9
10#include "../common.hpp"
11
12namespace ndn {
13
14const size_t MAX_LENGTH = 9000;
15
16template<class BaseTransport, class Protocol>
17class StreamTransportImpl
18{
19public:
20 typedef BaseTransport base_transport;
21 typedef Protocol protocol;
22 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
23
24 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
25 : m_transport(transport)
26 , m_socket(ioService)
27 , m_partialDataSize(0)
28 , m_connectionInProgress(false)
29 , m_connectTimer(ioService)
30 {
31 }
32
33 void
34 connectHandler(const boost::system::error_code& error)
35 {
36 m_connectionInProgress = false;
37 m_connectTimer.cancel();
38
39 if (!error)
40 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000041 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080042 m_transport.m_isConnected = true;
43
44 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
45 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
46 bind(&impl::handle_async_send, this, _1, *i));
47
48 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
49 i != m_sendPairQueue.end(); ++i)
50 {
51 std::vector<boost::asio::const_buffer> buffer;
52 buffer.reserve(2);
53 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
54 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
55 m_socket.async_send(buffer,
56 bind(&impl::handle_async_send, this, _1, i->first, i->second));
57 }
58
59 m_sendQueue.clear();
60 m_sendPairQueue.clear();
61 }
62 else
63 {
64 // may need to throw exception
65 m_transport.m_isConnected = false;
66 m_transport.close();
67 throw Transport::Error(error, "error while connecting to the forwarder");
68 }
69 }
70
71 void
72 connectTimeoutHandler(const boost::system::error_code& error)
73 {
74 if (error) // e.g., cancelled timer
75 return;
76
77 m_connectionInProgress = false;
78 m_transport.m_isConnected = false;
79 m_socket.close();
80 throw Transport::Error(error, "error while connecting to the forwarder");
81 }
82
83 void
84 connect(const typename protocol::endpoint& endpoint)
85 {
86 if (!m_connectionInProgress) {
87 m_connectionInProgress = true;
88
89 // Wait at most 4 seconds to connect
90 /// @todo Decide whether this number should be configurable
91 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
92 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
93
94 m_socket.open();
95 m_socket.async_connect(endpoint,
96 bind(&impl::connectHandler, this, _1));
97 }
98 }
99
100 void
101 close()
102 {
103 m_connectTimer.cancel();
104 m_socket.close();
105 m_transport.m_isConnected = false;
106 m_sendQueue.clear();
107 m_sendPairQueue.clear();
108 }
109
110 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000111 pause()
112 {
113 if (m_transport.m_isExpectingData)
114 {
115 m_transport.m_isExpectingData = false;
116 m_socket.cancel();
117 }
118 }
119
120 void
121 resume()
122 {
123 if (!m_transport.m_isExpectingData)
124 {
125 m_transport.m_isExpectingData = true;
126 m_partialDataSize = 0;
127 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
128 bind(&impl::handle_async_receive, this, _1, _2));
129 }
130 }
131
132 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800133 send(const Block& wire)
134 {
135 if (!m_transport.m_isConnected)
136 m_sendQueue.push_back(wire);
137 else
138 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
139 bind(&impl::handle_async_send, this, _1, wire));
140 }
141
142 void
143 send(const Block& header, const Block& payload)
144 {
145 if (!m_transport.m_isConnected)
146 {
147 m_sendPairQueue.push_back(std::make_pair(header, payload));
148 }
149 else
150 {
151 std::vector<boost::asio::const_buffer> buffers;
152 buffers.reserve(2);
153 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
154 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
155
156 m_socket.async_send(buffers,
157 bind(&impl::handle_async_send, this, _1, header, payload));
158 }
159 }
160
161 inline void
162 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
163 {
164 while(offset < availableSize)
165 {
166 Block element(buffer + offset, availableSize - offset);
167 m_transport.receive(element);
168
169 offset += element.size();
170 }
171 }
172
173 void
174 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
175 {
176 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
177
178 if (error)
179 {
180 if (error == boost::system::errc::operation_canceled) {
181 // async receive has been explicitly cancelled (e.g., socket close)
182 return;
183 }
184
185 m_socket.close(); // closing at this point may not be that necessary
186 m_transport.m_isConnected = true;
187 throw Transport::Error(error, "error while receiving data from socket");
188 }
189
190 if (!error && bytes_recvd > 0)
191 {
192 // m_inputBuffer has bytes_recvd received bytes of data
193 if (m_partialDataSize > 0)
194 {
195 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-m_partialDataSize);
196 std::copy(m_inputBuffer, m_inputBuffer + newDataSize, m_partialData + m_partialDataSize);
197
198 m_partialDataSize += newDataSize;
199
200 size_t offset = 0;
201 try
202 {
203 processAll(m_partialData, offset, m_partialDataSize);
204
205 // no exceptions => processed the whole thing
206 if (bytes_recvd - newDataSize > 0)
207 {
208 // there is a little bit more data available
209
210 offset = 0;
211 m_partialDataSize = bytes_recvd - newDataSize;
212 std::copy(m_inputBuffer + newDataSize, m_inputBuffer + newDataSize + m_partialDataSize, m_partialData);
213
214 processAll(m_partialData, offset, m_partialDataSize);
215
216 // no exceptions => processed the whole thing
217 m_partialDataSize = 0;
218 }
219 else
220 {
221 // done processing
222 m_partialDataSize = 0;
223 }
224 }
225 catch(Tlv::Error &)
226 {
227 if (offset > 0)
228 {
229 m_partialDataSize -= offset;
230 std::copy(m_partialData + offset, m_partialData + offset + m_partialDataSize, m_partialData);
231 }
232 else if (offset == 0 && m_partialDataSize == MAX_LENGTH)
233 {
234 // very bad... should close connection
235 m_socket.close();
236 m_transport.m_isConnected = true;
237 throw Transport::Error(boost::system::error_code(),
238 "input buffer full, but a valid TLV cannot be decoded");
239 }
240 }
241 }
242 else
243 {
244 size_t offset = 0;
245 try
246 {
247 processAll(m_inputBuffer, offset, bytes_recvd);
248 }
249 catch(Tlv::Error &error)
250 {
251 if (offset > 0)
252 {
253 m_partialDataSize = bytes_recvd - offset;
254 std::copy(m_inputBuffer + offset, m_inputBuffer + offset + m_partialDataSize, m_partialData);
255 }
256 }
257 }
258 }
259
260 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
261 bind(&impl::handle_async_receive, this, _1, _2));
262 }
263
264 void
265 handle_async_send(const boost::system::error_code& error, const Block& wire)
266 {
267 // pass (needed to keep data block alive during the send)
268 }
269
270 void
271 handle_async_send(const boost::system::error_code& error,
272 const Block& header, const Block& payload)
273 {
274 // pass (needed to keep data blocks alive during the send)
275 }
276
277protected:
278 base_transport& m_transport;
279
280 typename protocol::socket m_socket;
281 uint8_t m_inputBuffer[MAX_LENGTH];
282
283 uint8_t m_partialData[MAX_LENGTH];
284 size_t m_partialDataSize;
285
286 std::list< Block > m_sendQueue;
287 std::list< std::pair<Block, Block> > m_sendPairQueue;
288 bool m_connectionInProgress;
289
290 boost::asio::deadline_timer m_connectTimer;
291};
292
293
294template<class BaseTransport, class Protocol>
295class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
296{
297public:
298 typedef BaseTransport base_transport;
299 typedef Protocol protocol;
300 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
301
302 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
303 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
304 {
305 }
306
307 void
308 resolveHandler(const boost::system::error_code& error,
309 typename protocol::resolver::iterator endpoint,
310 const shared_ptr<typename protocol::resolver>&)
311 {
312 if (error)
313 {
314 if (error == boost::system::errc::operation_canceled)
315 return;
316
317 throw Transport::Error(error, "Error during resolution of host or port");
318 }
319
320 typename protocol::resolver::iterator end;
321 if (endpoint == end)
322 {
323 this->m_connectionInProgress = false;
324 this->m_transport.m_isConnected = false;
325 this->m_socket.close();
326 throw Transport::Error(error, "Unable to resolve because host or port");
327 }
328
329 this->m_socket.async_connect(*endpoint,
330 bind(&impl::connectHandler, this, _1));
331 }
332
333 void
334 connect(const typename protocol::resolver::query& query)
335 {
336 if (!this->m_connectionInProgress) {
337 this->m_connectionInProgress = true;
338
339 // Wait at most 4 seconds to connect
340 /// @todo Decide whether this number should be configurable
341 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
342 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
343
344 // typename boost::asio::ip::basic_resolver< protocol > resolver;
345 shared_ptr<typename protocol::resolver> resolver =
346 make_shared<typename protocol::resolver>(boost::ref(this->m_socket.get_io_service()));
347
348 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
349 }
350 }
351};
352
353
354} // namespace ndn
355
356#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP