blob: 963293eafa150812128d004247eafc3b988835bc [file] [log] [blame]
Junxiao Shi446de3c2016-07-25 22:38:16 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2013-2016 Regents of the University of California.
4 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
24
25#include "transport.hpp"
26
27#include <boost/asio.hpp>
28#include <list>
29
30namespace ndn {
31
32/** \brief implementation detail of a Boost.Asio-based stream-oriented transport
33 * \tparam BaseTransport a subclass of Transport
34 * \tparam Protocol a Boost.Asio stream-oriented protocol, including boost::asio::ip::tcp
35 * and boost::asio::local::stream_protocol
36 */
37template<typename BaseTransport, typename Protocol>
38class StreamTransportImpl
39{
40public:
41 typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
42 typedef std::list<Block> BlockSequence;
43 typedef std::list<BlockSequence> TransmissionQueue;
44
45 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
46 : m_transport(transport)
47 , m_socket(ioService)
48 , m_inputBufferSize(0)
49 , m_isConnecting(false)
50 , m_connectTimer(ioService)
51 {
52 }
53
54 void
55 connect(const typename Protocol::endpoint& endpoint)
56 {
57 if (!m_isConnecting) {
58 m_isConnecting = true;
59
60 // Wait at most 4 seconds to connect
61 /// @todo Decide whether this number should be configurable
62 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
63 m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
64
65 m_socket.open();
66 m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this, _1));
67 }
68 }
69
70 void
71 close()
72 {
73 m_isConnecting = false;
74
75 boost::system::error_code error; // to silently ignore all errors
76 m_connectTimer.cancel(error);
77 m_socket.cancel(error);
78 m_socket.close(error);
79
80 m_transport.m_isConnected = false;
81 m_transport.m_isReceiving = false;
82 m_transmissionQueue.clear();
83 }
84
85 void
86 pause()
87 {
88 if (m_isConnecting)
89 return;
90
91 if (m_transport.m_isReceiving) {
92 m_transport.m_isReceiving = false;
93 m_socket.cancel();
94 }
95 }
96
97 void
98 resume()
99 {
100 if (m_isConnecting)
101 return;
102
103 if (!m_transport.m_isReceiving) {
104 m_transport.m_isReceiving = true;
105 m_inputBufferSize = 0;
106 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
107 bind(&Impl::handleAsyncReceive, this, _1, _2));
108 }
109 }
110
111 void
112 send(const Block& wire)
113 {
114 BlockSequence sequence;
115 sequence.push_back(wire);
116 send(std::move(sequence));
117 }
118
119 void
120 send(const Block& header, const Block& payload)
121 {
122 BlockSequence sequence;
123 sequence.push_back(header);
124 sequence.push_back(payload);
125 send(std::move(sequence));
126 }
127
128protected:
129 void
130 connectHandler(const boost::system::error_code& error)
131 {
132 m_isConnecting = false;
133 m_connectTimer.cancel();
134
135 if (!error) {
136 resume();
137 m_transport.m_isConnected = true;
138
139 if (!m_transmissionQueue.empty()) {
140 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
141 bind(&Impl::handleAsyncWrite, this, _1,
142 m_transmissionQueue.begin()));
143 }
144 }
145 else {
146 m_transport.m_isConnected = false;
147 m_transport.close();
148 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
149 }
150 }
151
152 void
153 connectTimeoutHandler(const boost::system::error_code& error)
154 {
155 if (error) // e.g., cancelled timer
156 return;
157
158 m_transport.close();
159 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
160 }
161
162 void
163 send(BlockSequence&& sequence)
164 {
165 m_transmissionQueue.emplace_back(sequence);
166
167 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
168 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
169 bind(&Impl::handleAsyncWrite, this, _1,
170 m_transmissionQueue.begin()));
171 }
172
173 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
174 // next write will be scheduled either in connectHandler or in asyncWriteHandler
175 }
176
177 void
178 handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
179 {
180 if (error) {
181 if (error == boost::system::errc::operation_canceled) {
182 // async receive has been explicitly cancelled (e.g., socket close)
183 return;
184 }
185
186 m_transport.close();
187 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
188 }
189
190 if (!m_transport.m_isConnected) {
191 return; // queue has been already cleared
192 }
193
194 m_transmissionQueue.erase(queueItem);
195
196 if (!m_transmissionQueue.empty()) {
197 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
198 bind(&Impl::handleAsyncWrite, this, _1,
199 m_transmissionQueue.begin()));
200 }
201 }
202
203 void
204 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
205 {
206 if (error) {
207 if (error == boost::system::errc::operation_canceled) {
208 // async receive has been explicitly cancelled (e.g., socket close)
209 return;
210 }
211
212 m_transport.close();
213 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
214 }
215
216 m_inputBufferSize += nBytesRecvd;
217 // do magic
218
219 std::size_t offset = 0;
220 bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
221 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
222 m_transport.close();
223 BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
224 "input buffer full, but a valid TLV cannot be "
225 "decoded"));
226 }
227
228 if (offset > 0) {
229 if (offset != m_inputBufferSize) {
230 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
231 m_inputBufferSize -= offset;
232 }
233 else {
234 m_inputBufferSize = 0;
235 }
236 }
237
238 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
239 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
240 bind(&Impl::handleAsyncReceive, this, _1, _2));
241 }
242
243 bool
244 processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
245 {
246 while (offset < nBytesAvailable) {
247 bool isOk = false;
248 Block element;
249 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
250 if (!isOk)
251 return false;
252
253 m_transport.receive(element);
254 offset += element.size();
255 }
256 return true;
257 }
258
259protected:
260 BaseTransport& m_transport;
261
262 typename Protocol::socket m_socket;
263 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
264 size_t m_inputBufferSize;
265
266 TransmissionQueue m_transmissionQueue;
267 bool m_isConnecting;
268
269 boost::asio::deadline_timer m_connectTimer;
270};
271
272} // namespace ndn
273
274#endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP