blob: c2dab39b00781088f5d39a26a2f0e3f7f98b1c46 [file] [log] [blame]
Junxiao Shi446de3c2016-07-25 22:38:16 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeva54d5a62017-02-11 19:01:34 -08003 * Copyright (c) 2013-2017 Regents of the University of California.
Junxiao Shi446de3c2016-07-25 22:38:16 +00004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
24
25#include "transport.hpp"
26
27#include <boost/asio.hpp>
28#include <list>
29
30namespace ndn {
31
32/** \brief implementation detail of a Boost.Asio-based stream-oriented transport
33 * \tparam BaseTransport a subclass of Transport
34 * \tparam Protocol a Boost.Asio stream-oriented protocol, including boost::asio::ip::tcp
35 * and boost::asio::local::stream_protocol
36 */
37template<typename BaseTransport, typename Protocol>
Junxiao Shi8c565382016-07-25 23:04:49 +000038class StreamTransportImpl : public enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
Junxiao Shi446de3c2016-07-25 22:38:16 +000039{
40public:
41 typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
42 typedef std::list<Block> BlockSequence;
43 typedef std::list<BlockSequence> TransmissionQueue;
44
45 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
46 : m_transport(transport)
47 , m_socket(ioService)
48 , m_inputBufferSize(0)
49 , m_isConnecting(false)
50 , m_connectTimer(ioService)
51 {
52 }
53
54 void
55 connect(const typename Protocol::endpoint& endpoint)
56 {
57 if (!m_isConnecting) {
58 m_isConnecting = true;
59
60 // Wait at most 4 seconds to connect
61 /// @todo Decide whether this number should be configurable
62 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Junxiao Shi8c565382016-07-25 23:04:49 +000063 m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this->shared_from_this(), _1));
Junxiao Shi446de3c2016-07-25 22:38:16 +000064
65 m_socket.open();
Junxiao Shi8c565382016-07-25 23:04:49 +000066 m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this->shared_from_this(), _1));
Junxiao Shi446de3c2016-07-25 22:38:16 +000067 }
68 }
69
70 void
71 close()
72 {
73 m_isConnecting = false;
74
75 boost::system::error_code error; // to silently ignore all errors
76 m_connectTimer.cancel(error);
77 m_socket.cancel(error);
78 m_socket.close(error);
79
80 m_transport.m_isConnected = false;
81 m_transport.m_isReceiving = false;
82 m_transmissionQueue.clear();
83 }
84
85 void
86 pause()
87 {
88 if (m_isConnecting)
89 return;
90
91 if (m_transport.m_isReceiving) {
92 m_transport.m_isReceiving = false;
93 m_socket.cancel();
94 }
95 }
96
97 void
98 resume()
99 {
100 if (m_isConnecting)
101 return;
102
103 if (!m_transport.m_isReceiving) {
104 m_transport.m_isReceiving = true;
105 m_inputBufferSize = 0;
Junxiao Shi8c565382016-07-25 23:04:49 +0000106 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000107 }
108 }
109
110 void
111 send(const Block& wire)
112 {
113 BlockSequence sequence;
114 sequence.push_back(wire);
115 send(std::move(sequence));
116 }
117
118 void
119 send(const Block& header, const Block& payload)
120 {
121 BlockSequence sequence;
122 sequence.push_back(header);
123 sequence.push_back(payload);
124 send(std::move(sequence));
125 }
126
127protected:
128 void
129 connectHandler(const boost::system::error_code& error)
130 {
131 m_isConnecting = false;
132 m_connectTimer.cancel();
133
134 if (!error) {
Junxiao Shi446de3c2016-07-25 22:38:16 +0000135 m_transport.m_isConnected = true;
136
137 if (!m_transmissionQueue.empty()) {
Alexander Afanasyeva54d5a62017-02-11 19:01:34 -0800138 resume();
Junxiao Shi8c565382016-07-25 23:04:49 +0000139 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000140 }
141 }
142 else {
143 m_transport.m_isConnected = false;
144 m_transport.close();
145 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
146 }
147 }
148
149 void
150 connectTimeoutHandler(const boost::system::error_code& error)
151 {
152 if (error) // e.g., cancelled timer
153 return;
154
155 m_transport.close();
156 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
157 }
158
159 void
160 send(BlockSequence&& sequence)
161 {
162 m_transmissionQueue.emplace_back(sequence);
163
164 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000165 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000166 }
167
168 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
169 // next write will be scheduled either in connectHandler or in asyncWriteHandler
170 }
171
172 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000173 asyncWrite()
174 {
175 BOOST_ASSERT(!m_transmissionQueue.empty());
176 boost::asio::async_write(m_socket, m_transmissionQueue.front(),
177 bind(&Impl::handleAsyncWrite, this->shared_from_this(), _1, m_transmissionQueue.begin()));
178 }
179
180 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000181 handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
182 {
183 if (error) {
184 if (error == boost::system::errc::operation_canceled) {
185 // async receive has been explicitly cancelled (e.g., socket close)
186 return;
187 }
188
189 m_transport.close();
190 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
191 }
192
193 if (!m_transport.m_isConnected) {
194 return; // queue has been already cleared
195 }
196
197 m_transmissionQueue.erase(queueItem);
198
199 if (!m_transmissionQueue.empty()) {
Junxiao Shi8c565382016-07-25 23:04:49 +0000200 asyncWrite();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000201 }
202 }
203
204 void
Junxiao Shi8c565382016-07-25 23:04:49 +0000205 asyncReceive()
206 {
207 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
208 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
209 bind(&Impl::handleAsyncReceive, this->shared_from_this(), _1, _2));
210 }
211
212 void
Junxiao Shi446de3c2016-07-25 22:38:16 +0000213 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
214 {
215 if (error) {
216 if (error == boost::system::errc::operation_canceled) {
217 // async receive has been explicitly cancelled (e.g., socket close)
218 return;
219 }
220
221 m_transport.close();
222 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
223 }
224
225 m_inputBufferSize += nBytesRecvd;
226 // do magic
227
228 std::size_t offset = 0;
229 bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
230 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
231 m_transport.close();
232 BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
233 "input buffer full, but a valid TLV cannot be "
234 "decoded"));
235 }
236
237 if (offset > 0) {
238 if (offset != m_inputBufferSize) {
239 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
240 m_inputBufferSize -= offset;
241 }
242 else {
243 m_inputBufferSize = 0;
244 }
245 }
246
Junxiao Shi8c565382016-07-25 23:04:49 +0000247 asyncReceive();
Junxiao Shi446de3c2016-07-25 22:38:16 +0000248 }
249
250 bool
251 processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
252 {
253 while (offset < nBytesAvailable) {
254 bool isOk = false;
255 Block element;
256 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
257 if (!isOk)
258 return false;
259
260 m_transport.receive(element);
261 offset += element.size();
262 }
263 return true;
264 }
265
266protected:
267 BaseTransport& m_transport;
268
269 typename Protocol::socket m_socket;
270 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
271 size_t m_inputBufferSize;
272
273 TransmissionQueue m_transmissionQueue;
274 bool m_isConnecting;
275
276 boost::asio::deadline_timer m_connectTimer;
277};
278
279} // namespace ndn
280
281#endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP