blob: bbf01b9e7cbae251aecd4df4062d949debefc2a2 [file] [log] [blame]
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07003 * Copyright (c) 2013-2014, Regents of the University of California.
4 * All rights reserved.
5 *
6 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
7 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
8 *
9 * This file licensed under New BSD License. See COPYING for detailed information about
10 * ndn-cxx library copyright, permissions, and redistribution restrictions.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080011 */
12
13#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
14#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
15
16#include "../common.hpp"
17
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070018#include <list>
19
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020namespace ndn {
21
22const size_t MAX_LENGTH = 9000;
23
24template<class BaseTransport, class Protocol>
25class StreamTransportImpl
26{
27public:
28 typedef BaseTransport base_transport;
29 typedef Protocol protocol;
30 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
31
32 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
33 : m_transport(transport)
34 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070035 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080036 , m_connectionInProgress(false)
37 , m_connectTimer(ioService)
38 {
39 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070040
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080041 void
42 connectHandler(const boost::system::error_code& error)
43 {
44 m_connectionInProgress = false;
45 m_connectTimer.cancel();
46
47 if (!error)
48 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000049 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080050 m_transport.m_isConnected = true;
51
52 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
53 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
54 bind(&impl::handle_async_send, this, _1, *i));
55
56 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
57 i != m_sendPairQueue.end(); ++i)
58 {
59 std::vector<boost::asio::const_buffer> buffer;
60 buffer.reserve(2);
61 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
62 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
63 m_socket.async_send(buffer,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -070064 bind(&impl::handle_async_send2, this, _1, i->first, i->second));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080065 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070066
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080067 m_sendQueue.clear();
68 m_sendPairQueue.clear();
69 }
70 else
71 {
72 // may need to throw exception
73 m_transport.m_isConnected = false;
74 m_transport.close();
75 throw Transport::Error(error, "error while connecting to the forwarder");
76 }
77 }
78
79 void
80 connectTimeoutHandler(const boost::system::error_code& error)
81 {
82 if (error) // e.g., cancelled timer
83 return;
84
85 m_connectionInProgress = false;
86 m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -070087 m_transport.m_isExpectingData = false;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -070088 m_socket.cancel();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080089 m_socket.close();
90 throw Transport::Error(error, "error while connecting to the forwarder");
91 }
92
93 void
94 connect(const typename protocol::endpoint& endpoint)
95 {
96 if (!m_connectionInProgress) {
97 m_connectionInProgress = true;
98
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -070099 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800100 /// @todo Decide whether this number should be configurable
101 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
102 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
103
104 m_socket.open();
105 m_socket.async_connect(endpoint,
106 bind(&impl::connectHandler, this, _1));
107 }
108 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700109
110 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800111 close()
112 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700113 boost::system::error_code error; // to silently ignore all errors
114 m_connectTimer.cancel(error);
115 m_socket.cancel(error);
116 m_socket.close(error);
117
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800118 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700119 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800120 m_sendQueue.clear();
121 m_sendPairQueue.clear();
122 }
123
124 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000125 pause()
126 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700127 if (m_connectionInProgress)
128 return;
129
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000130 if (m_transport.m_isExpectingData)
131 {
132 m_transport.m_isExpectingData = false;
133 m_socket.cancel();
134 }
135 }
136
137 void
138 resume()
139 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700140 if (m_connectionInProgress)
141 return;
142
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000143 if (!m_transport.m_isExpectingData)
144 {
145 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700146 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000147 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
148 bind(&impl::handle_async_receive, this, _1, _2));
149 }
150 }
151
152 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800153 send(const Block& wire)
154 {
155 if (!m_transport.m_isConnected)
156 m_sendQueue.push_back(wire);
157 else
158 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
159 bind(&impl::handle_async_send, this, _1, wire));
160 }
161
162 void
163 send(const Block& header, const Block& payload)
164 {
165 if (!m_transport.m_isConnected)
166 {
167 m_sendPairQueue.push_back(std::make_pair(header, payload));
168 }
169 else
170 {
171 std::vector<boost::asio::const_buffer> buffers;
172 buffers.reserve(2);
173 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
174 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700175
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800176 m_socket.async_send(buffers,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700177 bind(&impl::handle_async_send2, this, _1, header, payload));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800178 }
179 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700180
181 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800182 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
183 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700184 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800185 while(offset < availableSize)
186 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700187 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
188 if (!ok)
189 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800190
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700191 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800192 offset += element.size();
193 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700194 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800195 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700196
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800197 void
198 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
199 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800200 if (error)
201 {
202 if (error == boost::system::errc::operation_canceled) {
203 // async receive has been explicitly cancelled (e.g., socket close)
204 return;
205 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700206
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700207 boost::system::error_code error; // to silently ignore all errors
208 m_socket.cancel(error);
209 m_socket.close(error); // closing at this point may not be that necessary
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800210 m_transport.m_isConnected = true;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700211 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800212 throw Transport::Error(error, "error while receiving data from socket");
213 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700214
215 m_inputBufferSize += bytes_recvd;
216 // do magic
217
218 std::size_t offset = 0;
219 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
220 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800221 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700222 // very bad... should close connection
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700223 boost::system::error_code error; // to silently ignore all errors
224 m_socket.cancel(error);
225 m_socket.close(error);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700226 m_transport.m_isConnected = false;
227 m_transport.m_isExpectingData = false;
228 throw Transport::Error(boost::system::error_code(),
229 "input buffer full, but a valid TLV cannot be decoded");
230 }
231
232 if (offset > 0)
233 {
234 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800235 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700236 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
237 m_inputBuffer);
238 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800239 }
240 else
241 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700242 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800243 }
244 }
245
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700246 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
247 MAX_LENGTH - m_inputBufferSize), 0,
248 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800249 }
250
251 void
252 handle_async_send(const boost::system::error_code& error, const Block& wire)
253 {
254 // pass (needed to keep data block alive during the send)
255 }
256
257 void
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700258 handle_async_send2(const boost::system::error_code& error,
259 const Block& header, const Block& payload)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800260 {
261 // pass (needed to keep data blocks alive during the send)
262 }
263
264protected:
265 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700266
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800267 typename protocol::socket m_socket;
268 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700269 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800270
271 std::list< Block > m_sendQueue;
272 std::list< std::pair<Block, Block> > m_sendPairQueue;
273 bool m_connectionInProgress;
274
275 boost::asio::deadline_timer m_connectTimer;
276};
277
278
279template<class BaseTransport, class Protocol>
280class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
281{
282public:
283 typedef BaseTransport base_transport;
284 typedef Protocol protocol;
285 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
286
287 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
288 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
289 {
290 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700291
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800292 void
293 resolveHandler(const boost::system::error_code& error,
294 typename protocol::resolver::iterator endpoint,
295 const shared_ptr<typename protocol::resolver>&)
296 {
297 if (error)
298 {
299 if (error == boost::system::errc::operation_canceled)
300 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700301
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800302 throw Transport::Error(error, "Error during resolution of host or port");
303 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700304
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800305 typename protocol::resolver::iterator end;
306 if (endpoint == end)
307 {
308 this->m_connectionInProgress = false;
309 this->m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700310 this->m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800311 this->m_socket.close();
312 throw Transport::Error(error, "Unable to resolve because host or port");
313 }
314
315 this->m_socket.async_connect(*endpoint,
316 bind(&impl::connectHandler, this, _1));
317 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700318
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800319 void
320 connect(const typename protocol::resolver::query& query)
321 {
322 if (!this->m_connectionInProgress) {
323 this->m_connectionInProgress = true;
324
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700325 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800326 /// @todo Decide whether this number should be configurable
327 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
328 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
329
330 // typename boost::asio::ip::basic_resolver< protocol > resolver;
331 shared_ptr<typename protocol::resolver> resolver =
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700332 make_shared<typename protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800333
334 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
335 }
336 }
337};
338
339
340} // namespace ndn
341
342#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP