blob: d2c232b4e84a2230445d91571a5a116f3410bda6 [file] [log] [blame]
Junxiao Shi446de3c2016-07-25 22:38:16 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesavento4d0d0962017-12-19 22:23:14 -05002/*
Davide Pesavento1fd00242018-05-20 00:11:01 -04003 * Copyright (c) 2013-2018 Regents of the University of California.
Junxiao Shi446de3c2016-07-25 22:38:16 +00004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
24
25#include "transport.hpp"
26
Davide Pesavento4d0d0962017-12-19 22:23:14 -050027#include <boost/asio/deadline_timer.hpp>
28#include <boost/asio/write.hpp>
29
Junxiao Shi446de3c2016-07-25 22:38:16 +000030#include <list>
31
32namespace ndn {
33
Davide Pesavento1fd00242018-05-20 00:11:01 -040034/** \brief Implementation detail of a Boost.Asio-based stream-oriented transport.
Junxiao Shi446de3c2016-07-25 22:38:16 +000035 * \tparam BaseTransport a subclass of Transport
Davide Pesavento1fd00242018-05-20 00:11:01 -040036 * \tparam Protocol a Boost.Asio stream-oriented protocol, e.g. boost::asio::ip::tcp
37 * or boost::asio::local::stream_protocol
Junxiao Shi446de3c2016-07-25 22:38:16 +000038 */
39template<typename BaseTransport, typename Protocol>
Davide Pesavento1fd00242018-05-20 00:11:01 -040040class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
Junxiao Shi446de3c2016-07-25 22:38:16 +000041{
42public:
Davide Pesavento4d0d0962017-12-19 22:23:14 -050043 typedef StreamTransportImpl<BaseTransport, Protocol> Impl;
Junxiao Shi446de3c2016-07-25 22:38:16 +000044 typedef std::list<Block> BlockSequence;
45 typedef std::list<BlockSequence> TransmissionQueue;
46
47 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
48 : m_transport(transport)
49 , m_socket(ioService)
50 , m_inputBufferSize(0)
51 , m_isConnecting(false)
52 , m_connectTimer(ioService)
53 {
54 }
55
56 void
57 connect(const typename Protocol::endpoint& endpoint)
58 {
59 if (!m_isConnecting) {
60 m_isConnecting = true;
61
62 // Wait at most 4 seconds to connect
63 /// @todo Decide whether this number should be configurable
64 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Junxiao Shi8c565382016-07-25 23:04:49 +000065 m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this->shared_from_this(), _1));
Junxiao Shi446de3c2016-07-25 22:38:16 +000066
67 m_socket.open();
Junxiao Shi8c565382016-07-25 23:04:49 +000068 m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this->shared_from_this(), _1));
Junxiao Shi446de3c2016-07-25 22:38:16 +000069 }
70 }
71
72 void
73 close()
74 {
75 m_isConnecting = false;
76
77 boost::system::error_code error; // to silently ignore all errors
78 m_connectTimer.cancel(error);
79 m_socket.cancel(error);
80 m_socket.close(error);
81
82 m_transport.m_isConnected = false;
83 m_transport.m_isReceiving = false;
84 m_transmissionQueue.clear();
85 }
86
87 void
88 pause()
89 {
90 if (m_isConnecting)
91 return;
92
93 if (m_transport.m_isReceiving) {
94 m_transport.m_isReceiving = false;
95 m_socket.cancel();
96 }
97 }
98
99 void
100 resume()
101 {
102 if (m_isConnecting)
103 return;
104
105 if (!m_transport.m_isReceiving) {
106 m_transport.m_isReceiving = true;
107 m_inputBufferSize = 0;
Junxiao Shi8c565382016-07-25 23:04:49 +0000108 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000109 }
110 }
111
112 void
113 send(const Block& wire)
114 {
115 BlockSequence sequence;
116 sequence.push_back(wire);
117 send(std::move(sequence));
118 }
119
120 void
121 send(const Block& header, const Block& payload)
122 {
123 BlockSequence sequence;
124 sequence.push_back(header);
125 sequence.push_back(payload);
126 send(std::move(sequence));
127 }
128
129protected:
130 void
131 connectHandler(const boost::system::error_code& error)
132 {
133 m_isConnecting = false;
134 m_connectTimer.cancel();
135
136 if (!error) {
Junxiao Shi446de3c2016-07-25 22:38:16 +0000137 m_transport.m_isConnected = true;
138
139 if (!m_transmissionQueue.empty()) {
Alexander Afanasyeva54d5a62017-02-11 19:01:34 -0800140 resume();
Junxiao Shi8c565382016-07-25 23:04:49 +0000141 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000142 }
143 }
144 else {
145 m_transport.m_isConnected = false;
146 m_transport.close();
147 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
148 }
149 }
150
151 void
152 connectTimeoutHandler(const boost::system::error_code& error)
153 {
154 if (error) // e.g., cancelled timer
155 return;
156
157 m_transport.close();
158 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
159 }
160
161 void
162 send(BlockSequence&& sequence)
163 {
164 m_transmissionQueue.emplace_back(sequence);
165
166 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000167 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000168 }
169
170 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
171 // next write will be scheduled either in connectHandler or in asyncWriteHandler
172 }
173
174 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000175 asyncWrite()
176 {
177 BOOST_ASSERT(!m_transmissionQueue.empty());
178 boost::asio::async_write(m_socket, m_transmissionQueue.front(),
179 bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1, m_transmissionQueue.begin()));
180 }
181
182 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000183 handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
184 {
185 if (error) {
186 if (error == boost::system::errc::operation_canceled) {
187 // async receive has been explicitly cancelled (e.g., socket close)
188 return;
189 }
190
191 m_transport.close();
192 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
193 }
194
195 if (!m_transport.m_isConnected) {
196 return; // queue has been already cleared
197 }
198
199 m_transmissionQueue.erase(queueItem);
200
201 if (!m_transmissionQueue.empty()) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000202 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000203 }
204 }
205
206 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000207 asyncReceive()
208 {
209 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
210 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
211 bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
212 }
213
214 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000215 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
216 {
217 if (error) {
218 if (error == boost::system::errc::operation_canceled) {
219 // async receive has been explicitly cancelled (e.g., socket close)
220 return;
221 }
222
223 m_transport.close();
224 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
225 }
226
227 m_inputBufferSize += nBytesRecvd;
228 // do magic
229
230 std::size_t offset = 0;
231 bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
232 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
233 m_transport.close();
234 BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
235 "input buffer full, but a valid TLV cannot be "
236 "decoded"));
237 }
238
239 if (offset > 0) {
240 if (offset != m_inputBufferSize) {
241 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
242 m_inputBufferSize -= offset;
243 }
244 else {
245 m_inputBufferSize = 0;
246 }
247 }
248
Junxiao Shi8c565382016-07-25 23:04:49 +0000249 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000250 }
251
252 bool
253 processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
254 {
255 while (offset < nBytesAvailable) {
256 bool isOk = false;
257 Block element;
258 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
259 if (!isOk)
260 return false;
261
262 m_transport.receive(element);
263 offset += element.size();
264 }
265 return true;
266 }
267
268protected:
269 BaseTransport& m_transport;
270
271 typename Protocol::socket m_socket;
272 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
273 size_t m_inputBufferSize;
274
275 TransmissionQueue m_transmissionQueue;
276 bool m_isConnecting;
277
278 boost::asio::deadline_timer m_connectTimer;
279};
280
281} // namespace ndn
282
283#endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP