blob: ca1ce9a84aee4b1a3e548f5bb9aaec209794ee3e [file] [log] [blame]
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07003 * Copyright (c) 2013-2014, Regents of the University of California.
4 * All rights reserved.
5 *
6 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
7 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
8 *
9 * This file licensed under New BSD License. See COPYING for detailed information about
10 * ndn-cxx library copyright, permissions, and redistribution restrictions.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080011 */
12
13#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
14#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
15
16#include "../common.hpp"
17
18namespace ndn {
19
20const size_t MAX_LENGTH = 9000;
21
22template<class BaseTransport, class Protocol>
23class StreamTransportImpl
24{
25public:
26 typedef BaseTransport base_transport;
27 typedef Protocol protocol;
28 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
29
30 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
31 : m_transport(transport)
32 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070033 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080034 , m_connectionInProgress(false)
35 , m_connectTimer(ioService)
36 {
37 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070038
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080039 void
40 connectHandler(const boost::system::error_code& error)
41 {
42 m_connectionInProgress = false;
43 m_connectTimer.cancel();
44
45 if (!error)
46 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000047 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080048 m_transport.m_isConnected = true;
49
50 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
51 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
52 bind(&impl::handle_async_send, this, _1, *i));
53
54 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
55 i != m_sendPairQueue.end(); ++i)
56 {
57 std::vector<boost::asio::const_buffer> buffer;
58 buffer.reserve(2);
59 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
60 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
61 m_socket.async_send(buffer,
62 bind(&impl::handle_async_send, this, _1, i->first, i->second));
63 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070064
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080065 m_sendQueue.clear();
66 m_sendPairQueue.clear();
67 }
68 else
69 {
70 // may need to throw exception
71 m_transport.m_isConnected = false;
72 m_transport.close();
73 throw Transport::Error(error, "error while connecting to the forwarder");
74 }
75 }
76
77 void
78 connectTimeoutHandler(const boost::system::error_code& error)
79 {
80 if (error) // e.g., cancelled timer
81 return;
82
83 m_connectionInProgress = false;
84 m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -070085 m_transport.m_isExpectingData = false;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -070086 m_socket.cancel();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080087 m_socket.close();
88 throw Transport::Error(error, "error while connecting to the forwarder");
89 }
90
91 void
92 connect(const typename protocol::endpoint& endpoint)
93 {
94 if (!m_connectionInProgress) {
95 m_connectionInProgress = true;
96
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -070097 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080098 /// @todo Decide whether this number should be configurable
99 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
100 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
101
102 m_socket.open();
103 m_socket.async_connect(endpoint,
104 bind(&impl::connectHandler, this, _1));
105 }
106 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700107
108 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800109 close()
110 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700111 boost::system::error_code error; // to silently ignore all errors
112 m_connectTimer.cancel(error);
113 m_socket.cancel(error);
114 m_socket.close(error);
115
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800116 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700117 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800118 m_sendQueue.clear();
119 m_sendPairQueue.clear();
120 }
121
122 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000123 pause()
124 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700125 if (m_connectionInProgress)
126 return;
127
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000128 if (m_transport.m_isExpectingData)
129 {
130 m_transport.m_isExpectingData = false;
131 m_socket.cancel();
132 }
133 }
134
135 void
136 resume()
137 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700138 if (m_connectionInProgress)
139 return;
140
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000141 if (!m_transport.m_isExpectingData)
142 {
143 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700144 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000145 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
146 bind(&impl::handle_async_receive, this, _1, _2));
147 }
148 }
149
150 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800151 send(const Block& wire)
152 {
153 if (!m_transport.m_isConnected)
154 m_sendQueue.push_back(wire);
155 else
156 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
157 bind(&impl::handle_async_send, this, _1, wire));
158 }
159
160 void
161 send(const Block& header, const Block& payload)
162 {
163 if (!m_transport.m_isConnected)
164 {
165 m_sendPairQueue.push_back(std::make_pair(header, payload));
166 }
167 else
168 {
169 std::vector<boost::asio::const_buffer> buffers;
170 buffers.reserve(2);
171 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
172 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700173
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800174 m_socket.async_send(buffers,
175 bind(&impl::handle_async_send, this, _1, header, payload));
176 }
177 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700178
179 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800180 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
181 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700182 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800183 while(offset < availableSize)
184 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700185 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
186 if (!ok)
187 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800188
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700189 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800190 offset += element.size();
191 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700192 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800193 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700194
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800195 void
196 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
197 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800198 if (error)
199 {
200 if (error == boost::system::errc::operation_canceled) {
201 // async receive has been explicitly cancelled (e.g., socket close)
202 return;
203 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700204
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700205 boost::system::error_code error; // to silently ignore all errors
206 m_socket.cancel(error);
207 m_socket.close(error); // closing at this point may not be that necessary
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800208 m_transport.m_isConnected = true;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700209 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800210 throw Transport::Error(error, "error while receiving data from socket");
211 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700212
213 m_inputBufferSize += bytes_recvd;
214 // do magic
215
216 std::size_t offset = 0;
217 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
218 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800219 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700220 // very bad... should close connection
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700221 boost::system::error_code error; // to silently ignore all errors
222 m_socket.cancel(error);
223 m_socket.close(error);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700224 m_transport.m_isConnected = false;
225 m_transport.m_isExpectingData = false;
226 throw Transport::Error(boost::system::error_code(),
227 "input buffer full, but a valid TLV cannot be decoded");
228 }
229
230 if (offset > 0)
231 {
232 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800233 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700234 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
235 m_inputBuffer);
236 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800237 }
238 else
239 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700240 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800241 }
242 }
243
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700244 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
245 MAX_LENGTH - m_inputBufferSize), 0,
246 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800247 }
248
249 void
250 handle_async_send(const boost::system::error_code& error, const Block& wire)
251 {
252 // pass (needed to keep data block alive during the send)
253 }
254
255 void
256 handle_async_send(const boost::system::error_code& error,
257 const Block& header, const Block& payload)
258 {
259 // pass (needed to keep data blocks alive during the send)
260 }
261
262protected:
263 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700264
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800265 typename protocol::socket m_socket;
266 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700267 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800268
269 std::list< Block > m_sendQueue;
270 std::list< std::pair<Block, Block> > m_sendPairQueue;
271 bool m_connectionInProgress;
272
273 boost::asio::deadline_timer m_connectTimer;
274};
275
276
277template<class BaseTransport, class Protocol>
278class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
279{
280public:
281 typedef BaseTransport base_transport;
282 typedef Protocol protocol;
283 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
284
285 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
286 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
287 {
288 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700289
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800290 void
291 resolveHandler(const boost::system::error_code& error,
292 typename protocol::resolver::iterator endpoint,
293 const shared_ptr<typename protocol::resolver>&)
294 {
295 if (error)
296 {
297 if (error == boost::system::errc::operation_canceled)
298 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700299
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800300 throw Transport::Error(error, "Error during resolution of host or port");
301 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700302
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800303 typename protocol::resolver::iterator end;
304 if (endpoint == end)
305 {
306 this->m_connectionInProgress = false;
307 this->m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700308 this->m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800309 this->m_socket.close();
310 throw Transport::Error(error, "Unable to resolve because host or port");
311 }
312
313 this->m_socket.async_connect(*endpoint,
314 bind(&impl::connectHandler, this, _1));
315 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700316
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800317 void
318 connect(const typename protocol::resolver::query& query)
319 {
320 if (!this->m_connectionInProgress) {
321 this->m_connectionInProgress = true;
322
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700323 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800324 /// @todo Decide whether this number should be configurable
325 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
326 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
327
328 // typename boost::asio::ip::basic_resolver< protocol > resolver;
329 shared_ptr<typename protocol::resolver> resolver =
330 make_shared<typename protocol::resolver>(boost::ref(this->m_socket.get_io_service()));
331
332 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
333 }
334 }
335};
336
337
338} // namespace ndn
339
340#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP