blob: 04aeac32637f0bbe6afde5a2b5b63dae1202e150 [file] [log] [blame]
Alexander Afanasyevc169a812014-05-20 20:37:29 -04001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08002/**
Alexander Afanasyevc169a812014-05-20 20:37:29 -04003 * Copyright (c) 2013-2014 Regents of the University of California.
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07006 *
Alexander Afanasyevc169a812014-05-20 20:37:29 -04007 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24
25#include "../common.hpp"
26
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070027#include <list>
28
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080029namespace ndn {
30
31const size_t MAX_LENGTH = 9000;
32
33template<class BaseTransport, class Protocol>
34class StreamTransportImpl
35{
36public:
37 typedef BaseTransport base_transport;
38 typedef Protocol protocol;
39 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
40
41 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
42 : m_transport(transport)
43 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070044 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080045 , m_connectionInProgress(false)
46 , m_connectTimer(ioService)
47 {
48 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070049
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080050 void
51 connectHandler(const boost::system::error_code& error)
52 {
53 m_connectionInProgress = false;
54 m_connectTimer.cancel();
55
56 if (!error)
57 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000058 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080059 m_transport.m_isConnected = true;
60
61 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
62 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
63 bind(&impl::handle_async_send, this, _1, *i));
64
65 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
66 i != m_sendPairQueue.end(); ++i)
67 {
68 std::vector<boost::asio::const_buffer> buffer;
69 buffer.reserve(2);
70 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
71 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
72 m_socket.async_send(buffer,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -070073 bind(&impl::handle_async_send2, this, _1, i->first, i->second));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080074 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070075
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080076 m_sendQueue.clear();
77 m_sendPairQueue.clear();
78 }
79 else
80 {
81 // may need to throw exception
82 m_transport.m_isConnected = false;
83 m_transport.close();
84 throw Transport::Error(error, "error while connecting to the forwarder");
85 }
86 }
87
88 void
89 connectTimeoutHandler(const boost::system::error_code& error)
90 {
91 if (error) // e.g., cancelled timer
92 return;
93
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -070094 m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080095 throw Transport::Error(error, "error while connecting to the forwarder");
96 }
97
98 void
99 connect(const typename protocol::endpoint& endpoint)
100 {
101 if (!m_connectionInProgress) {
102 m_connectionInProgress = true;
103
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700104 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800105 /// @todo Decide whether this number should be configurable
106 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
107 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
108
109 m_socket.open();
110 m_socket.async_connect(endpoint,
111 bind(&impl::connectHandler, this, _1));
112 }
113 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700114
115 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800116 close()
117 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700118 m_connectionInProgress = false;
119
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700120 boost::system::error_code error; // to silently ignore all errors
121 m_connectTimer.cancel(error);
122 m_socket.cancel(error);
123 m_socket.close(error);
124
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800125 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700126 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800127 m_sendQueue.clear();
128 m_sendPairQueue.clear();
129 }
130
131 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000132 pause()
133 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700134 if (m_connectionInProgress)
135 return;
136
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000137 if (m_transport.m_isExpectingData)
138 {
139 m_transport.m_isExpectingData = false;
140 m_socket.cancel();
141 }
142 }
143
144 void
145 resume()
146 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700147 if (m_connectionInProgress)
148 return;
149
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000150 if (!m_transport.m_isExpectingData)
151 {
152 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700153 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000154 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
155 bind(&impl::handle_async_receive, this, _1, _2));
156 }
157 }
158
159 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800160 send(const Block& wire)
161 {
162 if (!m_transport.m_isConnected)
163 m_sendQueue.push_back(wire);
164 else
165 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
166 bind(&impl::handle_async_send, this, _1, wire));
167 }
168
169 void
170 send(const Block& header, const Block& payload)
171 {
172 if (!m_transport.m_isConnected)
173 {
174 m_sendPairQueue.push_back(std::make_pair(header, payload));
175 }
176 else
177 {
178 std::vector<boost::asio::const_buffer> buffers;
179 buffers.reserve(2);
180 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
181 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700182
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800183 m_socket.async_send(buffers,
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700184 bind(&impl::handle_async_send2, this, _1, header, payload));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800185 }
186 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700187
188 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800189 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
190 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700191 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800192 while(offset < availableSize)
193 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700194 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
195 if (!ok)
196 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800197
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700198 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800199 offset += element.size();
200 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700201 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800202 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700203
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800204 void
205 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
206 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800207 if (error)
208 {
209 if (error == boost::system::errc::operation_canceled) {
210 // async receive has been explicitly cancelled (e.g., socket close)
211 return;
212 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700213
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700214 m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800215 throw Transport::Error(error, "error while receiving data from socket");
216 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700217
218 m_inputBufferSize += bytes_recvd;
219 // do magic
220
221 std::size_t offset = 0;
222 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
223 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800224 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700225 m_transport.close();
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700226 throw Transport::Error(boost::system::error_code(),
227 "input buffer full, but a valid TLV cannot be decoded");
228 }
229
230 if (offset > 0)
231 {
232 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800233 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700234 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
235 m_inputBuffer);
236 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800237 }
238 else
239 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700240 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800241 }
242 }
243
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700244 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
245 MAX_LENGTH - m_inputBufferSize), 0,
246 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800247 }
248
249 void
250 handle_async_send(const boost::system::error_code& error, const Block& wire)
251 {
252 // pass (needed to keep data block alive during the send)
253 }
254
255 void
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700256 handle_async_send2(const boost::system::error_code& error,
257 const Block& header, const Block& payload)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800258 {
259 // pass (needed to keep data blocks alive during the send)
260 }
261
262protected:
263 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700264
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800265 typename protocol::socket m_socket;
266 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700267 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800268
269 std::list< Block > m_sendQueue;
270 std::list< std::pair<Block, Block> > m_sendPairQueue;
271 bool m_connectionInProgress;
272
273 boost::asio::deadline_timer m_connectTimer;
274};
275
276
277template<class BaseTransport, class Protocol>
278class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
279{
280public:
281 typedef BaseTransport base_transport;
282 typedef Protocol protocol;
283 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
284
285 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
286 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
287 {
288 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700289
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800290 void
291 resolveHandler(const boost::system::error_code& error,
292 typename protocol::resolver::iterator endpoint,
293 const shared_ptr<typename protocol::resolver>&)
294 {
295 if (error)
296 {
297 if (error == boost::system::errc::operation_canceled)
298 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700299
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800300 throw Transport::Error(error, "Error during resolution of host or port");
301 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700302
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800303 typename protocol::resolver::iterator end;
304 if (endpoint == end)
305 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700306 this->m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800307 throw Transport::Error(error, "Unable to resolve because host or port");
308 }
309
310 this->m_socket.async_connect(*endpoint,
311 bind(&impl::connectHandler, this, _1));
312 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700313
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800314 void
315 connect(const typename protocol::resolver::query& query)
316 {
317 if (!this->m_connectionInProgress) {
318 this->m_connectionInProgress = true;
319
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700320 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800321 /// @todo Decide whether this number should be configurable
322 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
323 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
324
325 // typename boost::asio::ip::basic_resolver< protocol > resolver;
326 shared_ptr<typename protocol::resolver> resolver =
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700327 make_shared<typename protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800328
329 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
330 }
331 }
332};
333
334
335} // namespace ndn
336
337#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP