blob: cba7c78c12fbbfb8677b6939ba475d3b9fd83415 [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Davide Pesavento1816d4b2017-07-02 12:20:48 -04003 * Copyright (c) 2014-2017, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
30#include "core/global-io.hpp"
31
32#include <queue>
33
34namespace nfd {
35namespace face {
36
37/** \brief Implements Transport for stream-based protocols.
38 *
39 * \tparam Protocol a stream-based protocol in Boost.Asio
40 */
41template<class Protocol>
42class StreamTransport : public Transport
43{
44public:
45 typedef Protocol protocol;
46
47 /** \brief Construct stream transport.
48 *
49 * \param socket Protocol-specific socket for the created transport
50 */
51 explicit
52 StreamTransport(typename protocol::socket&& socket);
53
Davide Pesavento8728a252015-11-06 04:01:22 +010054protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040055 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020056 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070057
Yukai Tu74c895d2015-09-21 01:11:51 -070058 void
59 deferredClose();
60
Davide Pesavento1816d4b2017-07-02 12:20:48 -040061 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020062 doSend(Transport::Packet&& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010063
Yukai Tu74c895d2015-09-21 01:11:51 -070064 void
65 sendFromQueue();
66
67 void
68 handleSend(const boost::system::error_code& error,
69 size_t nBytesSent);
70
71 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070072 startReceive();
73
74 void
Yukai Tu74c895d2015-09-21 01:11:51 -070075 handleReceive(const boost::system::error_code& error,
76 size_t nBytesReceived);
77
78 void
79 processErrorCode(const boost::system::error_code& error);
80
Weiwei Liudcdf6212016-08-31 14:34:22 -070081 virtual void
82 handleError(const boost::system::error_code& error);
83
84 void
85 resetReceiveBuffer();
86
87 void
88 resetSendQueue();
89
Yukai Tu74c895d2015-09-21 01:11:51 -070090protected:
91 typename protocol::socket m_socket;
92
93 NFD_LOG_INCLASS_DECLARE();
94
95private:
Davide Pesavento8728a252015-11-06 04:01:22 +010096 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
97 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -070098 std::queue<Block> m_sendQueue;
99};
100
Yukai Tu74c895d2015-09-21 01:11:51 -0700101
102template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700103StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
104 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100105 , m_receiveBufferSize(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700106{
Weiwei Liudcdf6212016-08-31 14:34:22 -0700107 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700108}
109
110template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100111void
Yukai Tu74c895d2015-09-21 01:11:51 -0700112StreamTransport<T>::doClose()
113{
114 NFD_LOG_FACE_TRACE(__func__);
115
116 if (m_socket.is_open()) {
117 // Cancel all outstanding operations and shutdown the socket
118 // so that no further sends or receives are possible.
119 // Use the non-throwing variants and ignore errors, if any.
120 boost::system::error_code error;
121 m_socket.cancel(error);
122 m_socket.shutdown(protocol::socket::shutdown_both, error);
123 }
124
125 // Ensure that the Transport stays alive at least until
126 // all pending handlers are dispatched
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400127 getGlobalIoService().post([this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700128
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400129 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700130 //
131 // When doClose is called from a socket event handler (e.g., from handleReceive),
132 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
133 // Instead, handleSend is invoked as nothing bad happened.
134 //
135 // In order to prevent the assertion in handleSend from failing, we clear the queue
136 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
137 // this point have been executed. If more send operations are scheduled after this
138 // point, they will fail because the socket has been shutdown, and their callbacks
139 // will be invoked with error code == asio::error::shut_down.
140}
141
142template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100143void
Yukai Tu74c895d2015-09-21 01:11:51 -0700144StreamTransport<T>::deferredClose()
145{
146 NFD_LOG_FACE_TRACE(__func__);
147
Weiwei Liudcdf6212016-08-31 14:34:22 -0700148 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700149
150 // use the non-throwing variant and ignore errors, if any
151 boost::system::error_code error;
152 m_socket.close(error);
153
154 this->setState(TransportState::CLOSED);
155}
156
157template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100158void
159StreamTransport<T>::doSend(Transport::Packet&& packet)
160{
161 NFD_LOG_FACE_TRACE(__func__);
162
Weiwei Liudcdf6212016-08-31 14:34:22 -0700163 if (getState() != TransportState::UP)
164 return;
165
Davide Pesavento8728a252015-11-06 04:01:22 +0100166 bool wasQueueEmpty = m_sendQueue.empty();
167 m_sendQueue.push(packet.packet);
168
169 if (wasQueueEmpty)
170 sendFromQueue();
171}
172
173template<class T>
174void
Yukai Tu74c895d2015-09-21 01:11:51 -0700175StreamTransport<T>::sendFromQueue()
176{
177 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
178 bind(&StreamTransport<T>::handleSend, this,
179 boost::asio::placeholders::error,
180 boost::asio::placeholders::bytes_transferred));
181}
182
183template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100184void
Yukai Tu74c895d2015-09-21 01:11:51 -0700185StreamTransport<T>::handleSend(const boost::system::error_code& error,
186 size_t nBytesSent)
187{
188 if (error)
189 return processErrorCode(error);
190
191 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
192
193 BOOST_ASSERT(!m_sendQueue.empty());
194 m_sendQueue.pop();
195
196 if (!m_sendQueue.empty())
197 sendFromQueue();
198}
199
200template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100201void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700202StreamTransport<T>::startReceive()
203{
204 BOOST_ASSERT(getState() == TransportState::UP);
205
206 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
207 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
208 bind(&StreamTransport<T>::handleReceive, this,
209 boost::asio::placeholders::error,
210 boost::asio::placeholders::bytes_transferred));
211}
212
213template<class T>
214void
Yukai Tu74c895d2015-09-21 01:11:51 -0700215StreamTransport<T>::handleReceive(const boost::system::error_code& error,
216 size_t nBytesReceived)
217{
218 if (error)
219 return processErrorCode(error);
220
221 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
222
Davide Pesavento8728a252015-11-06 04:01:22 +0100223 m_receiveBufferSize += nBytesReceived;
Yukai Tu74c895d2015-09-21 01:11:51 -0700224 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700225 bool isOk = true;
Davide Pesavento8728a252015-11-06 04:01:22 +0100226 while (m_receiveBufferSize - offset > 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400227 Block element;
Davide Pesavento8728a252015-11-06 04:01:22 +0100228 std::tie(isOk, element) = Block::fromBuffer(m_receiveBuffer + offset, m_receiveBufferSize - offset);
Yukai Tu74c895d2015-09-21 01:11:51 -0700229 if (!isOk)
230 break;
231
232 offset += element.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100233 BOOST_ASSERT(offset <= m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700234
Davide Pesavento8728a252015-11-06 04:01:22 +0100235 this->receive(Transport::Packet(std::move(element)));
Yukai Tu74c895d2015-09-21 01:11:51 -0700236 }
237
Davide Pesavento8728a252015-11-06 04:01:22 +0100238 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400239 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700240 this->setState(TransportState::FAILED);
241 doClose();
242 return;
243 }
244
245 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100246 if (offset != m_receiveBufferSize) {
247 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
248 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700249 }
250 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100251 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700252 }
253 }
254
Weiwei Liudcdf6212016-08-31 14:34:22 -0700255 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700256}
257
258template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100259void
Yukai Tu74c895d2015-09-21 01:11:51 -0700260StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
261{
262 NFD_LOG_FACE_TRACE(__func__);
263
264 if (getState() == TransportState::CLOSING ||
265 getState() == TransportState::FAILED ||
266 getState() == TransportState::CLOSED ||
267 error == boost::asio::error::operation_aborted || // when cancel() is called
268 error == boost::asio::error::shut_down) // after shutdown() is called
269 // transport is shutting down, ignore any errors
270 return;
271
Weiwei Liudcdf6212016-08-31 14:34:22 -0700272 handleError(error);
273}
274
275template<class T>
276void
277StreamTransport<T>::handleError(const boost::system::error_code& error)
278{
Yukai Tu74c895d2015-09-21 01:11:51 -0700279 if (error != boost::asio::error::eof)
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400280 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Yukai Tu74c895d2015-09-21 01:11:51 -0700281
282 this->setState(TransportState::FAILED);
283 doClose();
284}
285
Weiwei Liudcdf6212016-08-31 14:34:22 -0700286template<class T>
287void
288StreamTransport<T>::resetReceiveBuffer()
289{
290 m_receiveBufferSize = 0;
291}
292
293template<class T>
294void
295StreamTransport<T>::resetSendQueue()
296{
297 std::queue<Block> emptyQueue;
298 std::swap(emptyQueue, m_sendQueue);
299}
300
Yukai Tu74c895d2015-09-21 01:11:51 -0700301} // namespace face
302} // namespace nfd
303
304#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP