blob: 491f77af9ec2a437061d2ce42bfebf15b6eb3ff0 [file] [log] [blame]
Alexander Afanasyevc169a812014-05-20 20:37:29 -04001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08002/**
Alexander Afanasyevc169a812014-05-20 20:37:29 -04003 * Copyright (c) 2013-2014 Regents of the University of California.
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07006 *
Alexander Afanasyevc169a812014-05-20 20:37:29 -04007 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24
25#include "../common.hpp"
26
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070027#include <list>
28
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080029namespace ndn {
30
31const size_t MAX_LENGTH = 9000;
32
33template<class BaseTransport, class Protocol>
34class StreamTransportImpl
35{
36public:
37 typedef BaseTransport base_transport;
38 typedef Protocol protocol;
39 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
40
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070041 typedef std::list<Block> BlockSequence;
42 typedef std::list<BlockSequence> TransmissionQueue;
43
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080044 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
45 : m_transport(transport)
46 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070047 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080048 , m_connectionInProgress(false)
49 , m_connectTimer(ioService)
50 {
51 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070052
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080053 void
54 connectHandler(const boost::system::error_code& error)
55 {
56 m_connectionInProgress = false;
57 m_connectTimer.cancel();
58
59 if (!error)
60 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000061 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080062 m_transport.m_isConnected = true;
63
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070064 if (!m_transmissionQueue.empty()) {
65 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
66 bind(&impl::handleAsyncWrite, this, _1,
67 m_transmissionQueue.begin()));
68 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080069 }
70 else
71 {
72 // may need to throw exception
73 m_transport.m_isConnected = false;
74 m_transport.close();
75 throw Transport::Error(error, "error while connecting to the forwarder");
76 }
77 }
78
79 void
80 connectTimeoutHandler(const boost::system::error_code& error)
81 {
82 if (error) // e.g., cancelled timer
83 return;
84
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -070085 m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080086 throw Transport::Error(error, "error while connecting to the forwarder");
87 }
88
89 void
90 connect(const typename protocol::endpoint& endpoint)
91 {
92 if (!m_connectionInProgress) {
93 m_connectionInProgress = true;
94
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -070095 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080096 /// @todo Decide whether this number should be configurable
97 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
98 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
99
100 m_socket.open();
101 m_socket.async_connect(endpoint,
102 bind(&impl::connectHandler, this, _1));
103 }
104 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700105
106 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800107 close()
108 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700109 m_connectionInProgress = false;
110
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700111 boost::system::error_code error; // to silently ignore all errors
112 m_connectTimer.cancel(error);
113 m_socket.cancel(error);
114 m_socket.close(error);
115
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800116 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700117 m_transport.m_isExpectingData = false;
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700118 m_transmissionQueue.clear();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800119 }
120
121 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000122 pause()
123 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700124 if (m_connectionInProgress)
125 return;
126
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000127 if (m_transport.m_isExpectingData)
128 {
129 m_transport.m_isExpectingData = false;
130 m_socket.cancel();
131 }
132 }
133
134 void
135 resume()
136 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700137 if (m_connectionInProgress)
138 return;
139
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000140 if (!m_transport.m_isExpectingData)
141 {
142 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700143 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000144 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
145 bind(&impl::handle_async_receive, this, _1, _2));
146 }
147 }
148
149 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800150 send(const Block& wire)
151 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700152 BlockSequence sequence;
153 sequence.push_back(wire);
154 m_transmissionQueue.push_back(sequence);
155
156 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
157 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
158 bind(&impl::handleAsyncWrite, this, _1,
159 m_transmissionQueue.begin()));
160 }
161
162 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
163 // next write will be scheduled either in connectHandler or in asyncWriteHandler
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800164 }
165
166 void
167 send(const Block& header, const Block& payload)
168 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700169 BlockSequence sequence;
170 sequence.push_back(header);
171 sequence.push_back(payload);
172 m_transmissionQueue.push_back(sequence);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700173
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700174 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
175 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
176 bind(&impl::handleAsyncWrite, this, _1,
177 m_transmissionQueue.begin()));
178 }
179
180 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
181 // next write will be scheduled either in connectHandler or in asyncWriteHandler
182 }
183
184 void
185 handleAsyncWrite(const boost::system::error_code& error,
186 TransmissionQueue::iterator queueItem)
187 {
188 if (error)
189 {
190 if (error == boost::system::errc::operation_canceled) {
191 // async receive has been explicitly cancelled (e.g., socket close)
192 return;
193 }
194
195 m_transport.close();
196 throw Transport::Error(error, "error while sending data to socket");
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800197 }
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700198
199 m_transmissionQueue.erase(queueItem);
200
201 if (!m_transmissionQueue.empty()) {
202 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
203 bind(&impl::handleAsyncWrite, this, _1,
204 m_transmissionQueue.begin()));
205 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800206 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700207
208 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800209 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
210 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700211 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800212 while(offset < availableSize)
213 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700214 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
215 if (!ok)
216 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800217
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700218 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800219 offset += element.size();
220 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700221 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800222 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700223
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800224 void
225 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
226 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800227 if (error)
228 {
229 if (error == boost::system::errc::operation_canceled) {
230 // async receive has been explicitly cancelled (e.g., socket close)
231 return;
232 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700233
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700234 m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800235 throw Transport::Error(error, "error while receiving data from socket");
236 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700237
238 m_inputBufferSize += bytes_recvd;
239 // do magic
240
241 std::size_t offset = 0;
242 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
243 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800244 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700245 m_transport.close();
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700246 throw Transport::Error(boost::system::error_code(),
247 "input buffer full, but a valid TLV cannot be decoded");
248 }
249
250 if (offset > 0)
251 {
252 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800253 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700254 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
255 m_inputBuffer);
256 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800257 }
258 else
259 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700260 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800261 }
262 }
263
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700264 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
265 MAX_LENGTH - m_inputBufferSize), 0,
266 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800267 }
268
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800269protected:
270 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700271
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800272 typename protocol::socket m_socket;
273 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700274 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800275
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700276 TransmissionQueue m_transmissionQueue;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800277 bool m_connectionInProgress;
278
279 boost::asio::deadline_timer m_connectTimer;
280};
281
282
283template<class BaseTransport, class Protocol>
284class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
285{
286public:
287 typedef BaseTransport base_transport;
288 typedef Protocol protocol;
289 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
290
291 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
292 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
293 {
294 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700295
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800296 void
297 resolveHandler(const boost::system::error_code& error,
298 typename protocol::resolver::iterator endpoint,
299 const shared_ptr<typename protocol::resolver>&)
300 {
301 if (error)
302 {
303 if (error == boost::system::errc::operation_canceled)
304 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700305
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800306 throw Transport::Error(error, "Error during resolution of host or port");
307 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700308
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800309 typename protocol::resolver::iterator end;
310 if (endpoint == end)
311 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700312 this->m_transport.close();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800313 throw Transport::Error(error, "Unable to resolve because host or port");
314 }
315
316 this->m_socket.async_connect(*endpoint,
317 bind(&impl::connectHandler, this, _1));
318 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700319
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800320 void
321 connect(const typename protocol::resolver::query& query)
322 {
323 if (!this->m_connectionInProgress) {
324 this->m_connectionInProgress = true;
325
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700326 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800327 /// @todo Decide whether this number should be configurable
328 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
329 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
330
331 // typename boost::asio::ip::basic_resolver< protocol > resolver;
332 shared_ptr<typename protocol::resolver> resolver =
Alexander Afanasyevb67090a2014-04-29 22:31:01 -0700333 make_shared<typename protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800334
335 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
336 }
337 }
338};
339
340
341} // namespace ndn
342
343#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP