blob: 67aac43923acd9ee4123f3e225cfc77931f3fde0 [file] [log] [blame]
Alexander Afanasyevc169a812014-05-20 20:37:29 -04001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08002/**
Alexander Afanasyevc169a812014-05-20 20:37:29 -04003 * Copyright (c) 2013-2014 Regents of the University of California.
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07006 *
Alexander Afanasyevc169a812014-05-20 20:37:29 -04007 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24
25#include "../common.hpp"
26
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070027#include <list>
28
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080029namespace ndn {
30
31const size_t MAX_LENGTH = 9000;
32
33template<class BaseTransport, class Protocol>
34class StreamTransportImpl
35{
36public:
37 typedef BaseTransport base_transport;
38 typedef Protocol protocol;
39 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
40
41 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
42 : m_transport(transport)
43 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070044 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080045 , m_connectionInProgress(false)
46 , m_connectTimer(ioService)
47 {
48 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070049
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080050 void
51 connectHandler(const boost::system::error_code& error)
52 {
53 m_connectionInProgress = false;
54 m_connectTimer.cancel();
55
56 if (!error)
57 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000058 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080059 m_transport.m_isConnected = true;
60
61 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
62 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
63 bind(&impl::handle_async_send, this, _1, *i));
64
65 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
66 i != m_sendPairQueue.end(); ++i)
67 {
68 std::vector<boost::asio::const_buffer> buffer;
69 buffer.reserve(2);
70 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
71 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
72 m_socket.async_send(buffer,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -070073 bind(&impl::handle_async_send2, this, _1, i->first, i->second));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080074 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070075
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080076 m_sendQueue.clear();
77 m_sendPairQueue.clear();
78 }
79 else
80 {
81 // may need to throw exception
82 m_transport.m_isConnected = false;
83 m_transport.close();
84 throw Transport::Error(error, "error while connecting to the forwarder");
85 }
86 }
87
88 void
89 connectTimeoutHandler(const boost::system::error_code& error)
90 {
91 if (error) // e.g., cancelled timer
92 return;
93
94 m_connectionInProgress = false;
95 m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -070096 m_transport.m_isExpectingData = false;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -070097 m_socket.cancel();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080098 m_socket.close();
99 throw Transport::Error(error, "error while connecting to the forwarder");
100 }
101
102 void
103 connect(const typename protocol::endpoint& endpoint)
104 {
105 if (!m_connectionInProgress) {
106 m_connectionInProgress = true;
107
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700108 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800109 /// @todo Decide whether this number should be configurable
110 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
111 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
112
113 m_socket.open();
114 m_socket.async_connect(endpoint,
115 bind(&impl::connectHandler, this, _1));
116 }
117 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700118
119 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800120 close()
121 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700122 boost::system::error_code error; // to silently ignore all errors
123 m_connectTimer.cancel(error);
124 m_socket.cancel(error);
125 m_socket.close(error);
126
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800127 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700128 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800129 m_sendQueue.clear();
130 m_sendPairQueue.clear();
131 }
132
133 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000134 pause()
135 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700136 if (m_connectionInProgress)
137 return;
138
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000139 if (m_transport.m_isExpectingData)
140 {
141 m_transport.m_isExpectingData = false;
142 m_socket.cancel();
143 }
144 }
145
146 void
147 resume()
148 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700149 if (m_connectionInProgress)
150 return;
151
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000152 if (!m_transport.m_isExpectingData)
153 {
154 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700155 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000156 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
157 bind(&impl::handle_async_receive, this, _1, _2));
158 }
159 }
160
161 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800162 send(const Block& wire)
163 {
164 if (!m_transport.m_isConnected)
165 m_sendQueue.push_back(wire);
166 else
167 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
168 bind(&impl::handle_async_send, this, _1, wire));
169 }
170
171 void
172 send(const Block& header, const Block& payload)
173 {
174 if (!m_transport.m_isConnected)
175 {
176 m_sendPairQueue.push_back(std::make_pair(header, payload));
177 }
178 else
179 {
180 std::vector<boost::asio::const_buffer> buffers;
181 buffers.reserve(2);
182 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
183 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700184
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800185 m_socket.async_send(buffers,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700186 bind(&impl::handle_async_send2, this, _1, header, payload));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800187 }
188 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700189
190 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800191 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
192 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700193 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800194 while(offset < availableSize)
195 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700196 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
197 if (!ok)
198 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800199
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700200 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800201 offset += element.size();
202 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700203 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800204 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700205
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800206 void
207 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
208 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800209 if (error)
210 {
211 if (error == boost::system::errc::operation_canceled) {
212 // async receive has been explicitly cancelled (e.g., socket close)
213 return;
214 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700215
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700216 boost::system::error_code error; // to silently ignore all errors
217 m_socket.cancel(error);
218 m_socket.close(error); // closing at this point may not be that necessary
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800219 m_transport.m_isConnected = true;
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700220 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800221 throw Transport::Error(error, "error while receiving data from socket");
222 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700223
224 m_inputBufferSize += bytes_recvd;
225 // do magic
226
227 std::size_t offset = 0;
228 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
229 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800230 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700231 // very bad... should close connection
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700232 boost::system::error_code error; // to silently ignore all errors
233 m_socket.cancel(error);
234 m_socket.close(error);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700235 m_transport.m_isConnected = false;
236 m_transport.m_isExpectingData = false;
237 throw Transport::Error(boost::system::error_code(),
238 "input buffer full, but a valid TLV cannot be decoded");
239 }
240
241 if (offset > 0)
242 {
243 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800244 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700245 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
246 m_inputBuffer);
247 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800248 }
249 else
250 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700251 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800252 }
253 }
254
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700255 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
256 MAX_LENGTH - m_inputBufferSize), 0,
257 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800258 }
259
260 void
261 handle_async_send(const boost::system::error_code& error, const Block& wire)
262 {
263 // pass (needed to keep data block alive during the send)
264 }
265
266 void
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700267 handle_async_send2(const boost::system::error_code& error,
268 const Block& header, const Block& payload)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800269 {
270 // pass (needed to keep data blocks alive during the send)
271 }
272
273protected:
274 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700275
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800276 typename protocol::socket m_socket;
277 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700278 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800279
280 std::list< Block > m_sendQueue;
281 std::list< std::pair<Block, Block> > m_sendPairQueue;
282 bool m_connectionInProgress;
283
284 boost::asio::deadline_timer m_connectTimer;
285};
286
287
288template<class BaseTransport, class Protocol>
289class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
290{
291public:
292 typedef BaseTransport base_transport;
293 typedef Protocol protocol;
294 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
295
296 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
297 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
298 {
299 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700300
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800301 void
302 resolveHandler(const boost::system::error_code& error,
303 typename protocol::resolver::iterator endpoint,
304 const shared_ptr<typename protocol::resolver>&)
305 {
306 if (error)
307 {
308 if (error == boost::system::errc::operation_canceled)
309 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700310
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800311 throw Transport::Error(error, "Error during resolution of host or port");
312 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700313
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800314 typename protocol::resolver::iterator end;
315 if (endpoint == end)
316 {
317 this->m_connectionInProgress = false;
318 this->m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700319 this->m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800320 this->m_socket.close();
321 throw Transport::Error(error, "Unable to resolve because host or port");
322 }
323
324 this->m_socket.async_connect(*endpoint,
325 bind(&impl::connectHandler, this, _1));
326 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700327
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800328 void
329 connect(const typename protocol::resolver::query& query)
330 {
331 if (!this->m_connectionInProgress) {
332 this->m_connectionInProgress = true;
333
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700334 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800335 /// @todo Decide whether this number should be configurable
336 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
337 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
338
339 // typename boost::asio::ip::basic_resolver< protocol > resolver;
340 shared_ptr<typename protocol::resolver> resolver =
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700341 make_shared<typename protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800342
343 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
344 }
345 }
346};
347
348
349} // namespace ndn
350
351#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP