blob: 410e095dc0ec3ab9552a96487cf03ae682160bfa [file] [log] [blame]
Junxiao Shi446de3c2016-07-25 22:38:16 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesavento4d0d0962017-12-19 22:23:14 -05002/*
Davide Pesavento09904412021-03-24 16:40:53 -04003 * Copyright (c) 2013-2021 Regents of the University of California.
Junxiao Shi446de3c2016-07-25 22:38:16 +00004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
Davide Pesavento09904412021-03-24 16:40:53 -040022#ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Junxiao Shi446de3c2016-07-25 22:38:16 +000024
Davide Pesavento7e780642018-11-24 15:51:34 -050025#include "ndn-cxx/transport/transport.hpp"
Junxiao Shi446de3c2016-07-25 22:38:16 +000026
Davide Pesavento0c315372019-04-25 14:50:42 -040027#include <boost/asio/steady_timer.hpp>
Davide Pesavento4d0d0962017-12-19 22:23:14 -050028#include <boost/asio/write.hpp>
29
Junxiao Shi446de3c2016-07-25 22:38:16 +000030#include <list>
31
32namespace ndn {
Junxiao Shi70d76eb2018-12-18 13:15:43 -070033namespace detail {
Junxiao Shi446de3c2016-07-25 22:38:16 +000034
Davide Pesavento1fd00242018-05-20 00:11:01 -040035/** \brief Implementation detail of a Boost.Asio-based stream-oriented transport.
Junxiao Shi446de3c2016-07-25 22:38:16 +000036 * \tparam BaseTransport a subclass of Transport
Davide Pesavento1fd00242018-05-20 00:11:01 -040037 * \tparam Protocol a Boost.Asio stream-oriented protocol, e.g. boost::asio::ip::tcp
38 * or boost::asio::local::stream_protocol
Junxiao Shi446de3c2016-07-25 22:38:16 +000039 */
40template<typename BaseTransport, typename Protocol>
Davide Pesavento1fd00242018-05-20 00:11:01 -040041class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
Junxiao Shi446de3c2016-07-25 22:38:16 +000042{
43public:
Davide Pesaventoa421c802019-11-10 21:23:12 -050044 using Impl = StreamTransportImpl<BaseTransport, Protocol>;
45 using BlockSequence = std::list<Block>;
46 using TransmissionQueue = std::list<BlockSequence>;
Junxiao Shi446de3c2016-07-25 22:38:16 +000047
48 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
49 : m_transport(transport)
50 , m_socket(ioService)
Junxiao Shi446de3c2016-07-25 22:38:16 +000051 , m_connectTimer(ioService)
52 {
53 }
54
55 void
56 connect(const typename Protocol::endpoint& endpoint)
57 {
Davide Pesaventoa421c802019-11-10 21:23:12 -050058 if (m_isConnecting) {
59 return;
Junxiao Shi446de3c2016-07-25 22:38:16 +000060 }
Davide Pesaventoa421c802019-11-10 21:23:12 -050061 m_isConnecting = true;
62
63 // Wait at most 4 seconds to connect
64 /// @todo Decide whether this number should be configurable
65 m_connectTimer.expires_from_now(std::chrono::seconds(4));
66 m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& error) {
67 self->connectTimeoutHandler(error);
68 });
69
70 m_socket.open();
71 m_socket.async_connect(endpoint, [self = this->shared_from_this()] (const auto& error) {
72 self->connectHandler(error);
73 });
Junxiao Shi446de3c2016-07-25 22:38:16 +000074 }
75
76 void
77 close()
78 {
79 m_isConnecting = false;
80
81 boost::system::error_code error; // to silently ignore all errors
82 m_connectTimer.cancel(error);
83 m_socket.cancel(error);
84 m_socket.close(error);
85
86 m_transport.m_isConnected = false;
87 m_transport.m_isReceiving = false;
88 m_transmissionQueue.clear();
89 }
90
91 void
92 pause()
93 {
94 if (m_isConnecting)
95 return;
96
97 if (m_transport.m_isReceiving) {
98 m_transport.m_isReceiving = false;
99 m_socket.cancel();
100 }
101 }
102
103 void
104 resume()
105 {
106 if (m_isConnecting)
107 return;
108
109 if (!m_transport.m_isReceiving) {
110 m_transport.m_isReceiving = true;
111 m_inputBufferSize = 0;
Junxiao Shi8c565382016-07-25 23:04:49 +0000112 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000113 }
114 }
115
116 void
117 send(const Block& wire)
118 {
119 BlockSequence sequence;
120 sequence.push_back(wire);
121 send(std::move(sequence));
122 }
123
124 void
125 send(const Block& header, const Block& payload)
126 {
127 BlockSequence sequence;
128 sequence.push_back(header);
129 sequence.push_back(payload);
130 send(std::move(sequence));
131 }
132
133protected:
134 void
135 connectHandler(const boost::system::error_code& error)
136 {
137 m_isConnecting = false;
138 m_connectTimer.cancel();
139
Davide Pesaventoa421c802019-11-10 21:23:12 -0500140 if (error) {
Junxiao Shi446de3c2016-07-25 22:38:16 +0000141 m_transport.m_isConnected = false;
142 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500143 NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000144 }
Davide Pesaventoa421c802019-11-10 21:23:12 -0500145
146 m_transport.m_isConnected = true;
147
148 if (!m_transmissionQueue.empty()) {
149 resume();
150 asyncWrite();
151 }
Junxiao Shi446de3c2016-07-25 22:38:16 +0000152 }
153
154 void
155 connectTimeoutHandler(const boost::system::error_code& error)
156 {
157 if (error) // e.g., cancelled timer
158 return;
159
160 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500161 NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000162 }
163
164 void
165 send(BlockSequence&& sequence)
166 {
167 m_transmissionQueue.emplace_back(sequence);
168
169 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000170 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000171 }
172
173 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
174 // next write will be scheduled either in connectHandler or in asyncWriteHandler
175 }
176
177 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000178 asyncWrite()
179 {
180 BOOST_ASSERT(!m_transmissionQueue.empty());
181 boost::asio::async_write(m_socket, m_transmissionQueue.front(),
Davide Pesaventoa421c802019-11-10 21:23:12 -0500182 bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1,
183 m_transmissionQueue.begin()));
Junxiao Shi8c565382016-07-25 23:04:49 +0000184 }
185
186 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000187 handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
188 {
189 if (error) {
190 if (error == boost::system::errc::operation_canceled) {
191 // async receive has been explicitly cancelled (e.g., socket close)
192 return;
193 }
Junxiao Shi446de3c2016-07-25 22:38:16 +0000194 m_transport.close();
Davide Pesaventoa421c802019-11-10 21:23:12 -0500195 NDN_THROW(Transport::Error(error, "error while writing data to socket"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000196 }
197
198 if (!m_transport.m_isConnected) {
199 return; // queue has been already cleared
200 }
201
202 m_transmissionQueue.erase(queueItem);
203
204 if (!m_transmissionQueue.empty()) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000205 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000206 }
207 }
208
209 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000210 asyncReceive()
211 {
212 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
213 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
214 bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
215 }
216
217 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000218 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
219 {
220 if (error) {
221 if (error == boost::system::errc::operation_canceled) {
222 // async receive has been explicitly cancelled (e.g., socket close)
223 return;
224 }
Junxiao Shi446de3c2016-07-25 22:38:16 +0000225 m_transport.close();
Davide Pesavento923ba442019-02-12 22:00:38 -0500226 NDN_THROW(Transport::Error(error, "error while receiving data from socket"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000227 }
228
229 m_inputBufferSize += nBytesRecvd;
230 // do magic
231
232 std::size_t offset = 0;
233 bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
234 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
235 m_transport.close();
Davide Pesaventoa421c802019-11-10 21:23:12 -0500236 NDN_THROW(Transport::Error("input buffer full, but a valid TLV cannot be decoded"));
Junxiao Shi446de3c2016-07-25 22:38:16 +0000237 }
238
239 if (offset > 0) {
240 if (offset != m_inputBufferSize) {
241 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
242 m_inputBufferSize -= offset;
243 }
244 else {
245 m_inputBufferSize = 0;
246 }
247 }
248
Junxiao Shi8c565382016-07-25 23:04:49 +0000249 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000250 }
251
252 bool
253 processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
254 {
255 while (offset < nBytesAvailable) {
256 bool isOk = false;
257 Block element;
258 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
259 if (!isOk)
260 return false;
261
Davide Pesaventoa421c802019-11-10 21:23:12 -0500262 m_transport.m_receiveCallback(element);
Junxiao Shi446de3c2016-07-25 22:38:16 +0000263 offset += element.size();
264 }
265 return true;
266 }
267
268protected:
269 BaseTransport& m_transport;
270
271 typename Protocol::socket m_socket;
272 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
Davide Pesaventoa421c802019-11-10 21:23:12 -0500273 size_t m_inputBufferSize = 0;
Junxiao Shi446de3c2016-07-25 22:38:16 +0000274
275 TransmissionQueue m_transmissionQueue;
Davide Pesavento0c315372019-04-25 14:50:42 -0400276 boost::asio::steady_timer m_connectTimer;
Davide Pesaventoa421c802019-11-10 21:23:12 -0500277 bool m_isConnecting = false;
Junxiao Shi446de3c2016-07-25 22:38:16 +0000278};
279
Junxiao Shi70d76eb2018-12-18 13:15:43 -0700280} // namespace detail
Junxiao Shi446de3c2016-07-25 22:38:16 +0000281} // namespace ndn
282
Davide Pesavento09904412021-03-24 16:40:53 -0400283#endif // NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP