blob: a90143a379f30fb574a367742a69b97a6ce839c5 [file] [log] [blame]
Junxiao Shi446de3c2016-07-25 22:38:16 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesavento4d0d0962017-12-19 22:23:14 -05002/*
Davide Pesavento923ba442019-02-12 22:00:38 -05003 * Copyright (c) 2013-2019 Regents of the University of California.
Junxiao Shi446de3c2016-07-25 22:38:16 +00004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
Junxiao Shi70d76eb2018-12-18 13:15:43 -070022#ifndef NDN_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Junxiao Shi446de3c2016-07-25 22:38:16 +000024
Davide Pesavento7e780642018-11-24 15:51:34 -050025#include "ndn-cxx/transport/transport.hpp"
Junxiao Shi446de3c2016-07-25 22:38:16 +000026
Davide Pesavento0c315372019-04-25 14:50:42 -040027#include <boost/asio/steady_timer.hpp>
Davide Pesavento4d0d0962017-12-19 22:23:14 -050028#include <boost/asio/write.hpp>
29
Junxiao Shi446de3c2016-07-25 22:38:16 +000030#include <list>
31
32namespace ndn {
Junxiao Shi70d76eb2018-12-18 13:15:43 -070033namespace detail {
Junxiao Shi446de3c2016-07-25 22:38:16 +000034
Davide Pesavento1fd00242018-05-20 00:11:01 -040035/** \brief Implementation detail of a Boost.Asio-based stream-oriented transport.
Junxiao Shi446de3c2016-07-25 22:38:16 +000036 * \tparam BaseTransport a subclass of Transport
Davide Pesavento1fd00242018-05-20 00:11:01 -040037 * \tparam Protocol a Boost.Asio stream-oriented protocol, e.g. boost::asio::ip::tcp
38 * or boost::asio::local::stream_protocol
Junxiao Shi446de3c2016-07-25 22:38:16 +000039 */
40template<typename BaseTransport, typename Protocol>
Davide Pesavento1fd00242018-05-20 00:11:01 -040041class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
Junxiao Shi446de3c2016-07-25 22:38:16 +000042{
43public:
Davide Pesavento4d0d0962017-12-19 22:23:14 -050044 typedef StreamTransportImpl<BaseTransport, Protocol> Impl;
Junxiao Shi446de3c2016-07-25 22:38:16 +000045 typedef std::list<Block> BlockSequence;
46 typedef std::list<BlockSequence> TransmissionQueue;
47
48 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
49 : m_transport(transport)
50 , m_socket(ioService)
51 , m_inputBufferSize(0)
52 , m_isConnecting(false)
53 , m_connectTimer(ioService)
54 {
55 }
56
57 void
58 connect(const typename Protocol::endpoint& endpoint)
59 {
60 if (!m_isConnecting) {
61 m_isConnecting = true;
62
63 // Wait at most 4 seconds to connect
64 /// @todo Decide whether this number should be configurable
Davide Pesavento0c315372019-04-25 14:50:42 -040065 m_connectTimer.expires_from_now(std::chrono::seconds(4));
66 m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& error) {
67 self->connectTimeoutHandler(error);
68 });
Junxiao Shi446de3c2016-07-25 22:38:16 +000069
70 m_socket.open();
Davide Pesavento0c315372019-04-25 14:50:42 -040071 m_socket.async_connect(endpoint, [self = this->shared_from_this()] (const auto& error) {
72 self->connectHandler(error);
73 });
Junxiao Shi446de3c2016-07-25 22:38:16 +000074 }
75 }
76
77 void
78 close()
79 {
80 m_isConnecting = false;
81
82 boost::system::error_code error; // to silently ignore all errors
83 m_connectTimer.cancel(error);
84 m_socket.cancel(error);
85 m_socket.close(error);
86
87 m_transport.m_isConnected = false;
88 m_transport.m_isReceiving = false;
89 m_transmissionQueue.clear();
90 }
91
92 void
93 pause()
94 {
95 if (m_isConnecting)
96 return;
97
98 if (m_transport.m_isReceiving) {
99 m_transport.m_isReceiving = false;
100 m_socket.cancel();
101 }
102 }
103
104 void
105 resume()
106 {
107 if (m_isConnecting)
108 return;
109
110 if (!m_transport.m_isReceiving) {
111 m_transport.m_isReceiving = true;
112 m_inputBufferSize = 0;
Junxiao Shi8c565382016-07-25 23:04:49 +0000113 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000114 }
115 }
116
117 void
118 send(const Block& wire)
119 {
120 BlockSequence sequence;
121 sequence.push_back(wire);
122 send(std::move(sequence));
123 }
124
125 void
126 send(const Block& header, const Block& payload)
127 {
128 BlockSequence sequence;
129 sequence.push_back(header);
130 sequence.push_back(payload);
131 send(std::move(sequence));
132 }
133
134protected:
135 void
136 connectHandler(const boost::system::error_code& error)
137 {
138 m_isConnecting = false;
139 m_connectTimer.cancel();
140
141 if (!error) {
Junxiao Shi446de3c2016-07-25 22:38:16 +0000142 m_transport.m_isConnected = true;
143
144 if (!m_transmissionQueue.empty()) {
Alexander Afanasyeva54d5a62017-02-11 19:01:34 -0800145 resume();
Junxiao Shi8c565382016-07-25 23:04:49 +0000146 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000147 }
148 }
149 else {
150 m_transport.m_isConnected = false;
151 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500152 NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000153 }
154 }
155
156 void
157 connectTimeoutHandler(const boost::system::error_code& error)
158 {
159 if (error) // e.g., cancelled timer
160 return;
161
162 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500163 NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000164 }
165
166 void
167 send(BlockSequence&& sequence)
168 {
169 m_transmissionQueue.emplace_back(sequence);
170
171 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000172 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000173 }
174
175 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
176 // next write will be scheduled either in connectHandler or in asyncWriteHandler
177 }
178
179 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000180 asyncWrite()
181 {
182 BOOST_ASSERT(!m_transmissionQueue.empty());
183 boost::asio::async_write(m_socket, m_transmissionQueue.front(),
184 bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1, m_transmissionQueue.begin()));
185 }
186
187 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000188 handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
189 {
190 if (error) {
191 if (error == boost::system::errc::operation_canceled) {
192 // async receive has been explicitly cancelled (e.g., socket close)
193 return;
194 }
195
196 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500197 NDN_THROW(Transport::Error(error, "error while sending data to socket"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000198 }
199
200 if (!m_transport.m_isConnected) {
201 return; // queue has been already cleared
202 }
203
204 m_transmissionQueue.erase(queueItem);
205
206 if (!m_transmissionQueue.empty()) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000207 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000208 }
209 }
210
211 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000212 asyncReceive()
213 {
214 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
215 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
216 bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
217 }
218
219 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000220 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
221 {
222 if (error) {
223 if (error == boost::system::errc::operation_canceled) {
224 // async receive has been explicitly cancelled (e.g., socket close)
225 return;
226 }
227
228 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500229 NDN_THROW(Transport::Error(error, "error while receiving data from socket"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000230 }
231
232 m_inputBufferSize += nBytesRecvd;
233 // do magic
234
235 std::size_t offset = 0;
236 bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
237 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
238 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500239 NDN_THROW(Transport::Error(boost::system::error_code(),
240 "input buffer full, but a valid TLV cannot be decoded"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000241 }
242
243 if (offset > 0) {
244 if (offset != m_inputBufferSize) {
245 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
246 m_inputBufferSize -= offset;
247 }
248 else {
249 m_inputBufferSize = 0;
250 }
251 }
252
Junxiao Shi8c565382016-07-25 23:04:49 +0000253 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000254 }
255
256 bool
257 processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
258 {
259 while (offset < nBytesAvailable) {
260 bool isOk = false;
261 Block element;
262 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
263 if (!isOk)
264 return false;
265
266 m_transport.receive(element);
267 offset += element.size();
268 }
269 return true;
270 }
271
272protected:
273 BaseTransport& m_transport;
274
275 typename Protocol::socket m_socket;
276 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
277 size_t m_inputBufferSize;
278
279 TransmissionQueue m_transmissionQueue;
280 bool m_isConnecting;
281
Davide Pesavento0c315372019-04-25 14:50:42 -0400282 boost::asio::steady_timer m_connectTimer;
Junxiao Shi446de3c2016-07-25 22:38:16 +0000283};
284
Junxiao Shi70d76eb2018-12-18 13:15:43 -0700285} // namespace detail
Junxiao Shi446de3c2016-07-25 22:38:16 +0000286} // namespace ndn
287
Junxiao Shi70d76eb2018-12-18 13:15:43 -0700288#endif // NDN_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP