blob: c8b0ac371b08a038b91f28e1c7d5e8b717154a6b [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
Davide Pesavento5d642632023-10-03 00:36:08 -04003 * Copyright (c) 2014-2023, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Davide Pesavento2cae8ca2019-04-18 20:48:05 -040031#include "common/global.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070032
33#include <queue>
34
Davide Pesavento5d642632023-10-03 00:36:08 -040035#include <boost/asio/defer.hpp>
Davide Pesaventoa9b09b62022-06-04 14:07:25 -040036#include <boost/asio/write.hpp>
37
Davide Pesaventoe422f9e2022-06-03 01:30:23 -040038namespace nfd::face {
Yukai Tu74c895d2015-09-21 01:11:51 -070039
40/** \brief Implements Transport for stream-based protocols.
41 *
42 * \tparam Protocol a stream-based protocol in Boost.Asio
43 */
44template<class Protocol>
45class StreamTransport : public Transport
46{
47public:
Davide Pesaventoa599d2a2022-02-16 18:52:43 -050048 using protocol = Protocol;
Yukai Tu74c895d2015-09-21 01:11:51 -070049
50 /** \brief Construct stream transport.
51 *
52 * \param socket Protocol-specific socket for the created transport
53 */
54 explicit
55 StreamTransport(typename protocol::socket&& socket);
56
Eric Newberryb49313d2017-12-24 20:22:27 -070057 ssize_t
58 getSendQueueLength() override;
59
Davide Pesavento8728a252015-11-06 04:01:22 +010060protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040061 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020062 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070063
Yukai Tu74c895d2015-09-21 01:11:51 -070064 void
65 deferredClose();
66
Davide Pesavento1816d4b2017-07-02 12:20:48 -040067 void
Teng Liang13d582a2020-07-21 20:23:11 -070068 doSend(const Block& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010069
Yukai Tu74c895d2015-09-21 01:11:51 -070070 void
71 sendFromQueue();
72
73 void
74 handleSend(const boost::system::error_code& error,
75 size_t nBytesSent);
76
77 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070078 startReceive();
79
80 void
Yukai Tu74c895d2015-09-21 01:11:51 -070081 handleReceive(const boost::system::error_code& error,
82 size_t nBytesReceived);
83
84 void
85 processErrorCode(const boost::system::error_code& error);
86
Weiwei Liudcdf6212016-08-31 14:34:22 -070087 virtual void
88 handleError(const boost::system::error_code& error);
89
90 void
91 resetReceiveBuffer();
92
93 void
94 resetSendQueue();
95
Eric Newberryb49313d2017-12-24 20:22:27 -070096 size_t
97 getSendQueueBytes() const;
98
Yukai Tu74c895d2015-09-21 01:11:51 -070099protected:
100 typename protocol::socket m_socket;
101
Davide Pesaventoa3148082018-04-12 18:21:54 -0400102 NFD_LOG_MEMBER_DECL();
Yukai Tu74c895d2015-09-21 01:11:51 -0700103
104private:
Davide Pesavento8728a252015-11-06 04:01:22 +0100105 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
106 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -0700107 std::queue<Block> m_sendQueue;
Eric Newberryb49313d2017-12-24 20:22:27 -0700108 size_t m_sendQueueBytes;
Yukai Tu74c895d2015-09-21 01:11:51 -0700109};
110
Yukai Tu74c895d2015-09-21 01:11:51 -0700111
112template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700113StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
114 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100115 , m_receiveBufferSize(0)
Eric Newberryb49313d2017-12-24 20:22:27 -0700116 , m_sendQueueBytes(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700117{
Eric Newberryb49313d2017-12-24 20:22:27 -0700118 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
119 // Therefore, protecting against send queue overflows is less critical than in other transport
120 // types. Instead, we use the default threshold specified in the GenericLinkService options.
121
Weiwei Liudcdf6212016-08-31 14:34:22 -0700122 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700123}
124
125template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700126ssize_t
127StreamTransport<T>::getSendQueueLength()
128{
129 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
130 if (queueLength == QUEUE_ERROR) {
131 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
132 }
133 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
134}
135
136template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100137void
Yukai Tu74c895d2015-09-21 01:11:51 -0700138StreamTransport<T>::doClose()
139{
140 NFD_LOG_FACE_TRACE(__func__);
141
142 if (m_socket.is_open()) {
143 // Cancel all outstanding operations and shutdown the socket
144 // so that no further sends or receives are possible.
145 // Use the non-throwing variants and ignore errors, if any.
146 boost::system::error_code error;
147 m_socket.cancel(error);
148 m_socket.shutdown(protocol::socket::shutdown_both, error);
149 }
150
151 // Ensure that the Transport stays alive at least until
152 // all pending handlers are dispatched
Davide Pesavento5d642632023-10-03 00:36:08 -0400153 boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700154
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400155 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700156 //
157 // When doClose is called from a socket event handler (e.g., from handleReceive),
158 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
159 // Instead, handleSend is invoked as nothing bad happened.
160 //
161 // In order to prevent the assertion in handleSend from failing, we clear the queue
162 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
163 // this point have been executed. If more send operations are scheduled after this
164 // point, they will fail because the socket has been shutdown, and their callbacks
165 // will be invoked with error code == asio::error::shut_down.
166}
167
168template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100169void
Yukai Tu74c895d2015-09-21 01:11:51 -0700170StreamTransport<T>::deferredClose()
171{
172 NFD_LOG_FACE_TRACE(__func__);
173
Weiwei Liudcdf6212016-08-31 14:34:22 -0700174 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700175
176 // use the non-throwing variant and ignore errors, if any
177 boost::system::error_code error;
178 m_socket.close(error);
179
180 this->setState(TransportState::CLOSED);
181}
182
183template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100184void
Teng Liang13d582a2020-07-21 20:23:11 -0700185StreamTransport<T>::doSend(const Block& packet)
Davide Pesavento8728a252015-11-06 04:01:22 +0100186{
187 NFD_LOG_FACE_TRACE(__func__);
188
Weiwei Liudcdf6212016-08-31 14:34:22 -0700189 if (getState() != TransportState::UP)
190 return;
191
Davide Pesavento8728a252015-11-06 04:01:22 +0100192 bool wasQueueEmpty = m_sendQueue.empty();
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400193 m_sendQueue.push(packet);
194 m_sendQueueBytes += packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100195
196 if (wasQueueEmpty)
197 sendFromQueue();
198}
199
200template<class T>
201void
Yukai Tu74c895d2015-09-21 01:11:51 -0700202StreamTransport<T>::sendFromQueue()
203{
204 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400205 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700206}
207
208template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100209void
Yukai Tu74c895d2015-09-21 01:11:51 -0700210StreamTransport<T>::handleSend(const boost::system::error_code& error,
211 size_t nBytesSent)
212{
213 if (error)
214 return processErrorCode(error);
215
216 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
217
218 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700219 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
220 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700221 m_sendQueue.pop();
222
223 if (!m_sendQueue.empty())
224 sendFromQueue();
225}
226
227template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100228void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700229StreamTransport<T>::startReceive()
230{
231 BOOST_ASSERT(getState() == TransportState::UP);
232
233 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
234 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400235 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
Weiwei Liudcdf6212016-08-31 14:34:22 -0700236}
237
238template<class T>
239void
Yukai Tu74c895d2015-09-21 01:11:51 -0700240StreamTransport<T>::handleReceive(const boost::system::error_code& error,
241 size_t nBytesReceived)
242{
243 if (error)
244 return processErrorCode(error);
245
246 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
247
Davide Pesavento8728a252015-11-06 04:01:22 +0100248 m_receiveBufferSize += nBytesReceived;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500249 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700250 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700251 bool isOk = true;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500252 while (offset < bufferView.size()) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400253 Block element;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500254 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Yukai Tu74c895d2015-09-21 01:11:51 -0700255 if (!isOk)
256 break;
257
258 offset += element.size();
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500259 BOOST_ASSERT(offset <= bufferView.size());
Yukai Tu74c895d2015-09-21 01:11:51 -0700260
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400261 this->receive(element);
Yukai Tu74c895d2015-09-21 01:11:51 -0700262 }
263
Davide Pesavento8728a252015-11-06 04:01:22 +0100264 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400265 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700266 this->setState(TransportState::FAILED);
267 doClose();
268 return;
269 }
270
271 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100272 if (offset != m_receiveBufferSize) {
273 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
274 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700275 }
276 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100277 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700278 }
279 }
280
Weiwei Liudcdf6212016-08-31 14:34:22 -0700281 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700282}
283
284template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100285void
Yukai Tu74c895d2015-09-21 01:11:51 -0700286StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
287{
288 NFD_LOG_FACE_TRACE(__func__);
289
290 if (getState() == TransportState::CLOSING ||
291 getState() == TransportState::FAILED ||
292 getState() == TransportState::CLOSED ||
293 error == boost::asio::error::operation_aborted || // when cancel() is called
294 error == boost::asio::error::shut_down) // after shutdown() is called
295 // transport is shutting down, ignore any errors
296 return;
297
Weiwei Liudcdf6212016-08-31 14:34:22 -0700298 handleError(error);
299}
300
301template<class T>
302void
303StreamTransport<T>::handleError(const boost::system::error_code& error)
304{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400305 if (error == boost::asio::error::eof) {
306 this->setState(TransportState::CLOSING);
307 }
308 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400309 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400310 this->setState(TransportState::FAILED);
311 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700312 doClose();
313}
314
Weiwei Liudcdf6212016-08-31 14:34:22 -0700315template<class T>
316void
317StreamTransport<T>::resetReceiveBuffer()
318{
319 m_receiveBufferSize = 0;
320}
321
322template<class T>
323void
324StreamTransport<T>::resetSendQueue()
325{
326 std::queue<Block> emptyQueue;
327 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700328 m_sendQueueBytes = 0;
329}
330
331template<class T>
332size_t
333StreamTransport<T>::getSendQueueBytes() const
334{
335 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700336}
337
Davide Pesaventoe422f9e2022-06-03 01:30:23 -0400338} // namespace nfd::face
Yukai Tu74c895d2015-09-21 01:11:51 -0700339
340#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP