blob: ec33b269fa4ab552374d01092498298e75d31e22 [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
Davide Pesaventoa599d2a2022-02-16 18:52:43 -05003 * Copyright (c) 2014-2022, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Davide Pesavento2cae8ca2019-04-18 20:48:05 -040031#include "common/global.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070032
33#include <queue>
34
Davide Pesaventoe422f9e2022-06-03 01:30:23 -040035namespace nfd::face {
Yukai Tu74c895d2015-09-21 01:11:51 -070036
37/** \brief Implements Transport for stream-based protocols.
38 *
39 * \tparam Protocol a stream-based protocol in Boost.Asio
40 */
41template<class Protocol>
42class StreamTransport : public Transport
43{
44public:
Davide Pesaventoa599d2a2022-02-16 18:52:43 -050045 using protocol = Protocol;
Yukai Tu74c895d2015-09-21 01:11:51 -070046
47 /** \brief Construct stream transport.
48 *
49 * \param socket Protocol-specific socket for the created transport
50 */
51 explicit
52 StreamTransport(typename protocol::socket&& socket);
53
Eric Newberryb49313d2017-12-24 20:22:27 -070054 ssize_t
55 getSendQueueLength() override;
56
Davide Pesavento8728a252015-11-06 04:01:22 +010057protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040058 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020059 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070060
Yukai Tu74c895d2015-09-21 01:11:51 -070061 void
62 deferredClose();
63
Davide Pesavento1816d4b2017-07-02 12:20:48 -040064 void
Teng Liang13d582a2020-07-21 20:23:11 -070065 doSend(const Block& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010066
Yukai Tu74c895d2015-09-21 01:11:51 -070067 void
68 sendFromQueue();
69
70 void
71 handleSend(const boost::system::error_code& error,
72 size_t nBytesSent);
73
74 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070075 startReceive();
76
77 void
Yukai Tu74c895d2015-09-21 01:11:51 -070078 handleReceive(const boost::system::error_code& error,
79 size_t nBytesReceived);
80
81 void
82 processErrorCode(const boost::system::error_code& error);
83
Weiwei Liudcdf6212016-08-31 14:34:22 -070084 virtual void
85 handleError(const boost::system::error_code& error);
86
87 void
88 resetReceiveBuffer();
89
90 void
91 resetSendQueue();
92
Eric Newberryb49313d2017-12-24 20:22:27 -070093 size_t
94 getSendQueueBytes() const;
95
Yukai Tu74c895d2015-09-21 01:11:51 -070096protected:
97 typename protocol::socket m_socket;
98
Davide Pesaventoa3148082018-04-12 18:21:54 -040099 NFD_LOG_MEMBER_DECL();
Yukai Tu74c895d2015-09-21 01:11:51 -0700100
101private:
Davide Pesavento8728a252015-11-06 04:01:22 +0100102 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
103 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -0700104 std::queue<Block> m_sendQueue;
Eric Newberryb49313d2017-12-24 20:22:27 -0700105 size_t m_sendQueueBytes;
Yukai Tu74c895d2015-09-21 01:11:51 -0700106};
107
Yukai Tu74c895d2015-09-21 01:11:51 -0700108
109template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700110StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
111 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100112 , m_receiveBufferSize(0)
Eric Newberryb49313d2017-12-24 20:22:27 -0700113 , m_sendQueueBytes(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700114{
Eric Newberryb49313d2017-12-24 20:22:27 -0700115 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
116 // Therefore, protecting against send queue overflows is less critical than in other transport
117 // types. Instead, we use the default threshold specified in the GenericLinkService options.
118
Weiwei Liudcdf6212016-08-31 14:34:22 -0700119 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700120}
121
122template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700123ssize_t
124StreamTransport<T>::getSendQueueLength()
125{
126 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
127 if (queueLength == QUEUE_ERROR) {
128 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
129 }
130 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
131}
132
133template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100134void
Yukai Tu74c895d2015-09-21 01:11:51 -0700135StreamTransport<T>::doClose()
136{
137 NFD_LOG_FACE_TRACE(__func__);
138
139 if (m_socket.is_open()) {
140 // Cancel all outstanding operations and shutdown the socket
141 // so that no further sends or receives are possible.
142 // Use the non-throwing variants and ignore errors, if any.
143 boost::system::error_code error;
144 m_socket.cancel(error);
145 m_socket.shutdown(protocol::socket::shutdown_both, error);
146 }
147
148 // Ensure that the Transport stays alive at least until
149 // all pending handlers are dispatched
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400150 getGlobalIoService().post([this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700151
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400152 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700153 //
154 // When doClose is called from a socket event handler (e.g., from handleReceive),
155 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
156 // Instead, handleSend is invoked as nothing bad happened.
157 //
158 // In order to prevent the assertion in handleSend from failing, we clear the queue
159 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
160 // this point have been executed. If more send operations are scheduled after this
161 // point, they will fail because the socket has been shutdown, and their callbacks
162 // will be invoked with error code == asio::error::shut_down.
163}
164
165template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100166void
Yukai Tu74c895d2015-09-21 01:11:51 -0700167StreamTransport<T>::deferredClose()
168{
169 NFD_LOG_FACE_TRACE(__func__);
170
Weiwei Liudcdf6212016-08-31 14:34:22 -0700171 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700172
173 // use the non-throwing variant and ignore errors, if any
174 boost::system::error_code error;
175 m_socket.close(error);
176
177 this->setState(TransportState::CLOSED);
178}
179
180template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100181void
Teng Liang13d582a2020-07-21 20:23:11 -0700182StreamTransport<T>::doSend(const Block& packet)
Davide Pesavento8728a252015-11-06 04:01:22 +0100183{
184 NFD_LOG_FACE_TRACE(__func__);
185
Weiwei Liudcdf6212016-08-31 14:34:22 -0700186 if (getState() != TransportState::UP)
187 return;
188
Davide Pesavento8728a252015-11-06 04:01:22 +0100189 bool wasQueueEmpty = m_sendQueue.empty();
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400190 m_sendQueue.push(packet);
191 m_sendQueueBytes += packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100192
193 if (wasQueueEmpty)
194 sendFromQueue();
195}
196
197template<class T>
198void
Yukai Tu74c895d2015-09-21 01:11:51 -0700199StreamTransport<T>::sendFromQueue()
200{
201 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400202 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700203}
204
205template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100206void
Yukai Tu74c895d2015-09-21 01:11:51 -0700207StreamTransport<T>::handleSend(const boost::system::error_code& error,
208 size_t nBytesSent)
209{
210 if (error)
211 return processErrorCode(error);
212
213 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
214
215 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700216 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
217 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700218 m_sendQueue.pop();
219
220 if (!m_sendQueue.empty())
221 sendFromQueue();
222}
223
224template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100225void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700226StreamTransport<T>::startReceive()
227{
228 BOOST_ASSERT(getState() == TransportState::UP);
229
230 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
231 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400232 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
Weiwei Liudcdf6212016-08-31 14:34:22 -0700233}
234
235template<class T>
236void
Yukai Tu74c895d2015-09-21 01:11:51 -0700237StreamTransport<T>::handleReceive(const boost::system::error_code& error,
238 size_t nBytesReceived)
239{
240 if (error)
241 return processErrorCode(error);
242
243 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
244
Davide Pesavento8728a252015-11-06 04:01:22 +0100245 m_receiveBufferSize += nBytesReceived;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500246 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700247 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700248 bool isOk = true;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500249 while (offset < bufferView.size()) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400250 Block element;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500251 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Yukai Tu74c895d2015-09-21 01:11:51 -0700252 if (!isOk)
253 break;
254
255 offset += element.size();
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500256 BOOST_ASSERT(offset <= bufferView.size());
Yukai Tu74c895d2015-09-21 01:11:51 -0700257
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400258 this->receive(element);
Yukai Tu74c895d2015-09-21 01:11:51 -0700259 }
260
Davide Pesavento8728a252015-11-06 04:01:22 +0100261 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400262 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700263 this->setState(TransportState::FAILED);
264 doClose();
265 return;
266 }
267
268 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100269 if (offset != m_receiveBufferSize) {
270 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
271 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700272 }
273 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100274 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700275 }
276 }
277
Weiwei Liudcdf6212016-08-31 14:34:22 -0700278 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700279}
280
281template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100282void
Yukai Tu74c895d2015-09-21 01:11:51 -0700283StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
284{
285 NFD_LOG_FACE_TRACE(__func__);
286
287 if (getState() == TransportState::CLOSING ||
288 getState() == TransportState::FAILED ||
289 getState() == TransportState::CLOSED ||
290 error == boost::asio::error::operation_aborted || // when cancel() is called
291 error == boost::asio::error::shut_down) // after shutdown() is called
292 // transport is shutting down, ignore any errors
293 return;
294
Weiwei Liudcdf6212016-08-31 14:34:22 -0700295 handleError(error);
296}
297
298template<class T>
299void
300StreamTransport<T>::handleError(const boost::system::error_code& error)
301{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400302 if (error == boost::asio::error::eof) {
303 this->setState(TransportState::CLOSING);
304 }
305 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400306 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400307 this->setState(TransportState::FAILED);
308 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700309 doClose();
310}
311
Weiwei Liudcdf6212016-08-31 14:34:22 -0700312template<class T>
313void
314StreamTransport<T>::resetReceiveBuffer()
315{
316 m_receiveBufferSize = 0;
317}
318
319template<class T>
320void
321StreamTransport<T>::resetSendQueue()
322{
323 std::queue<Block> emptyQueue;
324 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700325 m_sendQueueBytes = 0;
326}
327
328template<class T>
329size_t
330StreamTransport<T>::getSendQueueBytes() const
331{
332 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700333}
334
Davide Pesaventoe422f9e2022-06-03 01:30:23 -0400335} // namespace nfd::face
Yukai Tu74c895d2015-09-21 01:11:51 -0700336
337#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP