blob: a0c7ca3b880d3e3cae36f64dc3d6ae9969b72e06 [file] [log] [blame]
Alexander Afanasyevc169a812014-05-20 20:37:29 -04001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08002/**
Davide Pesavento537dc3a2016-02-18 19:35:26 +01003 * Copyright (c) 2013-2016 Regents of the University of California.
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07006 *
Alexander Afanasyevc169a812014-05-20 20:37:29 -04007 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24
Junxiao Shi71355d52014-12-11 09:20:44 -070025#include "transport.hpp"
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080026
Davide Pesavento537dc3a2016-02-18 19:35:26 +010027#include <boost/asio.hpp>
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070028#include <list>
29
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080030namespace ndn {
31
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080032template<class BaseTransport, class Protocol>
33class StreamTransportImpl
34{
35public:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070036 typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080037
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070038 typedef std::list<Block> BlockSequence;
39 typedef std::list<BlockSequence> TransmissionQueue;
40
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070041 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080042 : m_transport(transport)
43 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070044 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080045 , m_connectionInProgress(false)
46 , m_connectTimer(ioService)
47 {
48 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070049
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080050 void
51 connectHandler(const boost::system::error_code& error)
52 {
53 m_connectionInProgress = false;
54 m_connectTimer.cancel();
55
56 if (!error)
57 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000058 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080059 m_transport.m_isConnected = true;
60
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070061 if (!m_transmissionQueue.empty()) {
62 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070063 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070064 m_transmissionQueue.begin()));
65 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080066 }
67 else
68 {
69 // may need to throw exception
70 m_transport.m_isConnected = false;
71 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -070072 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080073 }
74 }
75
76 void
77 connectTimeoutHandler(const boost::system::error_code& error)
78 {
79 if (error) // e.g., cancelled timer
80 return;
81
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -070082 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -070083 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080084 }
85
86 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070087 connect(const typename Protocol::endpoint& endpoint)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080088 {
89 if (!m_connectionInProgress) {
90 m_connectionInProgress = true;
91
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070092 // Wait at most 4 seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080093 /// @todo Decide whether this number should be configurable
94 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070095 m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080096
97 m_socket.open();
98 m_socket.async_connect(endpoint,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070099 bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800100 }
101 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700102
103 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800104 close()
105 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700106 m_connectionInProgress = false;
107
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700108 boost::system::error_code error; // to silently ignore all errors
109 m_connectTimer.cancel(error);
110 m_socket.cancel(error);
111 m_socket.close(error);
112
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800113 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700114 m_transport.m_isExpectingData = false;
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700115 m_transmissionQueue.clear();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800116 }
117
118 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000119 pause()
120 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700121 if (m_connectionInProgress)
122 return;
123
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000124 if (m_transport.m_isExpectingData)
125 {
126 m_transport.m_isExpectingData = false;
127 m_socket.cancel();
128 }
129 }
130
Alexander Afanasyev9d158f02015-02-17 21:30:19 -0800131 /**
132 * @warning Must not be called directly or indirectly from within handleAsyncReceive invocation
133 */
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000134 void
135 resume()
136 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700137 if (m_connectionInProgress)
138 return;
139
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000140 if (!m_transport.m_isExpectingData)
141 {
142 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700143 m_inputBufferSize = 0;
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700144 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
145 bind(&Impl::handleAsyncReceive, this, _1, _2));
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000146 }
147 }
148
149 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800150 send(const Block& wire)
151 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700152 BlockSequence sequence;
153 sequence.push_back(wire);
154 m_transmissionQueue.push_back(sequence);
155
156 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
157 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700158 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700159 m_transmissionQueue.begin()));
160 }
161
162 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
163 // next write will be scheduled either in connectHandler or in asyncWriteHandler
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800164 }
165
166 void
167 send(const Block& header, const Block& payload)
168 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700169 BlockSequence sequence;
170 sequence.push_back(header);
171 sequence.push_back(payload);
172 m_transmissionQueue.push_back(sequence);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700173
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700174 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
175 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700176 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700177 m_transmissionQueue.begin()));
178 }
179
180 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
181 // next write will be scheduled either in connectHandler or in asyncWriteHandler
182 }
183
184 void
185 handleAsyncWrite(const boost::system::error_code& error,
186 TransmissionQueue::iterator queueItem)
187 {
188 if (error)
189 {
190 if (error == boost::system::errc::operation_canceled) {
191 // async receive has been explicitly cancelled (e.g., socket close)
192 return;
193 }
194
195 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700196 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800197 }
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700198
Alexander Afanasyevfba1ac62015-08-26 15:19:13 -0700199 if (!m_transport.m_isConnected) {
200 return; // queue has been already cleared
201 }
202
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700203 m_transmissionQueue.erase(queueItem);
204
205 if (!m_transmissionQueue.empty()) {
206 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700207 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700208 m_transmissionQueue.begin()));
209 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800210 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700211
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700212 bool
213 processAll(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800214 {
Junxiao Shi02a4bf32015-02-21 21:07:46 -0700215 while (offset < nBytesAvailable) {
216 bool isOk = false;
217 Block element;
218 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
219 if (!isOk)
220 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800221
Junxiao Shi02a4bf32015-02-21 21:07:46 -0700222 m_transport.receive(element);
223 offset += element.size();
224 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700225 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800226 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700227
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800228 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700229 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800230 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800231 if (error)
232 {
233 if (error == boost::system::errc::operation_canceled) {
234 // async receive has been explicitly cancelled (e.g., socket close)
235 return;
236 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700237
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700238 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700239 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800240 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700241
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700242 m_inputBufferSize += nBytesRecvd;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700243 // do magic
244
245 std::size_t offset = 0;
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700246 bool hasProcessedSome = processAll(m_inputBuffer, offset, m_inputBufferSize);
247 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800248 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700249 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700250 BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
251 "input buffer full, but a valid TLV cannot be "
252 "decoded"));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700253 }
254
255 if (offset > 0)
256 {
257 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800258 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700259 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
260 m_inputBuffer);
261 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800262 }
263 else
264 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700265 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800266 }
267 }
268
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700269 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700270 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
271 bind(&Impl::handleAsyncReceive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800272 }
273
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800274protected:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700275 BaseTransport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700276
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700277 typename Protocol::socket m_socket;
278 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700279 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800280
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700281 TransmissionQueue m_transmissionQueue;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800282 bool m_connectionInProgress;
283
284 boost::asio::deadline_timer m_connectTimer;
285};
286
287
288template<class BaseTransport, class Protocol>
289class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
290{
291public:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700292 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> Impl;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800293
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700294 StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
295 : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800296 {
297 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700298
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800299 void
300 resolveHandler(const boost::system::error_code& error,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700301 typename Protocol::resolver::iterator endpoint,
302 const shared_ptr<typename Protocol::resolver>&)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800303 {
304 if (error)
305 {
306 if (error == boost::system::errc::operation_canceled)
307 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700308
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700309 BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800310 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700311
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700312 typename Protocol::resolver::iterator end;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800313 if (endpoint == end)
314 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700315 this->m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700316 BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800317 }
318
319 this->m_socket.async_connect(*endpoint,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700320 bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800321 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700322
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800323 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700324 connect(const typename Protocol::resolver::query& query)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800325 {
326 if (!this->m_connectionInProgress) {
327 this->m_connectionInProgress = true;
328
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700329 // Wait at most 4 seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800330 /// @todo Decide whether this number should be configurable
331 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700332 this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800333
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700334 // typename boost::asio::ip::basic_resolver< Protocol > resolver;
335 shared_ptr<typename Protocol::resolver> resolver =
336 make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800337
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700338 resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800339 }
340 }
341};
342
343
344} // namespace ndn
345
346#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP