blob: 750432f167df54e8b14d72c7b9663985d318b7ab [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
Davide Pesavento91c15c82024-01-15 17:14:23 -05003 * Copyright (c) 2014-2024, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Davide Pesavento2cae8ca2019-04-18 20:48:05 -040031#include "common/global.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070032
Davide Pesavento2fae1d02024-06-08 20:02:27 -040033#include <array>
Yukai Tu74c895d2015-09-21 01:11:51 -070034#include <queue>
35
Davide Pesavento5d642632023-10-03 00:36:08 -040036#include <boost/asio/defer.hpp>
Davide Pesaventoa9b09b62022-06-04 14:07:25 -040037#include <boost/asio/write.hpp>
38
Davide Pesaventoe422f9e2022-06-03 01:30:23 -040039namespace nfd::face {
Yukai Tu74c895d2015-09-21 01:11:51 -070040
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040041/**
42 * \brief Implements a Transport for stream-based protocols.
Yukai Tu74c895d2015-09-21 01:11:51 -070043 *
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040044 * \tparam Protocol a stream-based protocol in Boost.Asio
Yukai Tu74c895d2015-09-21 01:11:51 -070045 */
46template<class Protocol>
47class StreamTransport : public Transport
48{
49public:
Davide Pesaventoa599d2a2022-02-16 18:52:43 -050050 using protocol = Protocol;
Yukai Tu74c895d2015-09-21 01:11:51 -070051
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040052 /**
53 * \brief Construct stream transport.
Yukai Tu74c895d2015-09-21 01:11:51 -070054 *
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040055 * \param socket Protocol-specific socket for the created transport
Yukai Tu74c895d2015-09-21 01:11:51 -070056 */
57 explicit
58 StreamTransport(typename protocol::socket&& socket);
59
Eric Newberryb49313d2017-12-24 20:22:27 -070060 ssize_t
61 getSendQueueLength() override;
62
Davide Pesavento8728a252015-11-06 04:01:22 +010063protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040064 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020065 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070066
Yukai Tu74c895d2015-09-21 01:11:51 -070067 void
68 deferredClose();
69
Davide Pesavento1816d4b2017-07-02 12:20:48 -040070 void
Teng Liang13d582a2020-07-21 20:23:11 -070071 doSend(const Block& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010072
Yukai Tu74c895d2015-09-21 01:11:51 -070073 void
74 sendFromQueue();
75
76 void
77 handleSend(const boost::system::error_code& error,
78 size_t nBytesSent);
79
80 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070081 startReceive();
82
83 void
Yukai Tu74c895d2015-09-21 01:11:51 -070084 handleReceive(const boost::system::error_code& error,
85 size_t nBytesReceived);
86
87 void
88 processErrorCode(const boost::system::error_code& error);
89
Weiwei Liudcdf6212016-08-31 14:34:22 -070090 virtual void
91 handleError(const boost::system::error_code& error);
92
93 void
94 resetReceiveBuffer();
95
96 void
97 resetSendQueue();
98
Eric Newberryb49313d2017-12-24 20:22:27 -070099 size_t
100 getSendQueueBytes() const;
101
Yukai Tu74c895d2015-09-21 01:11:51 -0700102protected:
103 typename protocol::socket m_socket;
104
Davide Pesaventoa3148082018-04-12 18:21:54 -0400105 NFD_LOG_MEMBER_DECL();
Yukai Tu74c895d2015-09-21 01:11:51 -0700106
107private:
Davide Pesavento91c15c82024-01-15 17:14:23 -0500108 size_t m_sendQueueBytes = 0;
Davide Pesavento2fae1d02024-06-08 20:02:27 -0400109 std::queue<Block> m_sendQueue;
110 size_t m_receiveBufferSize = 0;
111 std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
Yukai Tu74c895d2015-09-21 01:11:51 -0700112};
113
Yukai Tu74c895d2015-09-21 01:11:51 -0700114
115template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700116StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
117 : m_socket(std::move(socket))
Yukai Tu74c895d2015-09-21 01:11:51 -0700118{
Eric Newberryb49313d2017-12-24 20:22:27 -0700119 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
120 // Therefore, protecting against send queue overflows is less critical than in other transport
121 // types. Instead, we use the default threshold specified in the GenericLinkService options.
122
Weiwei Liudcdf6212016-08-31 14:34:22 -0700123 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700124}
125
126template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700127ssize_t
128StreamTransport<T>::getSendQueueLength()
129{
130 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
131 if (queueLength == QUEUE_ERROR) {
132 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
133 }
134 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
135}
136
137template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100138void
Yukai Tu74c895d2015-09-21 01:11:51 -0700139StreamTransport<T>::doClose()
140{
141 NFD_LOG_FACE_TRACE(__func__);
142
143 if (m_socket.is_open()) {
144 // Cancel all outstanding operations and shutdown the socket
145 // so that no further sends or receives are possible.
146 // Use the non-throwing variants and ignore errors, if any.
147 boost::system::error_code error;
148 m_socket.cancel(error);
Davide Pesaventod91fe6d2023-10-04 21:40:02 -0400149 m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
Yukai Tu74c895d2015-09-21 01:11:51 -0700150 }
151
152 // Ensure that the Transport stays alive at least until
153 // all pending handlers are dispatched
Davide Pesavento5d642632023-10-03 00:36:08 -0400154 boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700155
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400156 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700157 //
158 // When doClose is called from a socket event handler (e.g., from handleReceive),
159 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
160 // Instead, handleSend is invoked as nothing bad happened.
161 //
162 // In order to prevent the assertion in handleSend from failing, we clear the queue
163 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
164 // this point have been executed. If more send operations are scheduled after this
165 // point, they will fail because the socket has been shutdown, and their callbacks
166 // will be invoked with error code == asio::error::shut_down.
167}
168
169template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100170void
Yukai Tu74c895d2015-09-21 01:11:51 -0700171StreamTransport<T>::deferredClose()
172{
173 NFD_LOG_FACE_TRACE(__func__);
174
Weiwei Liudcdf6212016-08-31 14:34:22 -0700175 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700176
177 // use the non-throwing variant and ignore errors, if any
178 boost::system::error_code error;
179 m_socket.close(error);
180
181 this->setState(TransportState::CLOSED);
182}
183
184template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100185void
Teng Liang13d582a2020-07-21 20:23:11 -0700186StreamTransport<T>::doSend(const Block& packet)
Davide Pesavento8728a252015-11-06 04:01:22 +0100187{
188 NFD_LOG_FACE_TRACE(__func__);
189
Weiwei Liudcdf6212016-08-31 14:34:22 -0700190 if (getState() != TransportState::UP)
191 return;
192
Davide Pesavento8728a252015-11-06 04:01:22 +0100193 bool wasQueueEmpty = m_sendQueue.empty();
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400194 m_sendQueue.push(packet);
195 m_sendQueueBytes += packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100196
197 if (wasQueueEmpty)
198 sendFromQueue();
199}
200
201template<class T>
202void
Yukai Tu74c895d2015-09-21 01:11:51 -0700203StreamTransport<T>::sendFromQueue()
204{
205 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400206 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700207}
208
209template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100210void
Yukai Tu74c895d2015-09-21 01:11:51 -0700211StreamTransport<T>::handleSend(const boost::system::error_code& error,
212 size_t nBytesSent)
213{
214 if (error)
215 return processErrorCode(error);
216
217 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
218
219 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700220 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
221 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700222 m_sendQueue.pop();
223
224 if (!m_sendQueue.empty())
225 sendFromQueue();
226}
227
228template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100229void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700230StreamTransport<T>::startReceive()
231{
232 BOOST_ASSERT(getState() == TransportState::UP);
233
Davide Pesavento2fae1d02024-06-08 20:02:27 -0400234 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer.data() + m_receiveBufferSize,
235 m_receiveBuffer.size() - m_receiveBufferSize),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400236 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
Weiwei Liudcdf6212016-08-31 14:34:22 -0700237}
238
239template<class T>
240void
Yukai Tu74c895d2015-09-21 01:11:51 -0700241StreamTransport<T>::handleReceive(const boost::system::error_code& error,
242 size_t nBytesReceived)
243{
244 if (error)
245 return processErrorCode(error);
246
247 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
248
Davide Pesavento8728a252015-11-06 04:01:22 +0100249 m_receiveBufferSize += nBytesReceived;
Davide Pesavento2fae1d02024-06-08 20:02:27 -0400250 auto unparsedBytes = ndn::span(m_receiveBuffer).first(m_receiveBufferSize);
251 while (!unparsedBytes.empty()) {
252 auto [isOk, element] = Block::fromBuffer(unparsedBytes);
Yukai Tu74c895d2015-09-21 01:11:51 -0700253 if (!isOk)
254 break;
255
Davide Pesavento2fae1d02024-06-08 20:02:27 -0400256 unparsedBytes = unparsedBytes.subspan(element.size());
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400257 this->receive(element);
Yukai Tu74c895d2015-09-21 01:11:51 -0700258 }
259
Davide Pesavento2fae1d02024-06-08 20:02:27 -0400260 if (unparsedBytes.empty()) {
261 // nothing left in the receive buffer
262 m_receiveBufferSize = 0;
263 }
264 else if (unparsedBytes.data() != m_receiveBuffer.data()) {
265 // move remaining unparsed bytes to the beginning of the receive buffer
266 std::copy(unparsedBytes.begin(), unparsedBytes.end(), m_receiveBuffer.begin());
267 m_receiveBufferSize = unparsedBytes.size();
268 }
269 else if (unparsedBytes.size() == m_receiveBuffer.size()) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400270 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700271 this->setState(TransportState::FAILED);
272 doClose();
273 return;
274 }
275
Weiwei Liudcdf6212016-08-31 14:34:22 -0700276 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700277}
278
279template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100280void
Yukai Tu74c895d2015-09-21 01:11:51 -0700281StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
282{
283 NFD_LOG_FACE_TRACE(__func__);
284
285 if (getState() == TransportState::CLOSING ||
286 getState() == TransportState::FAILED ||
287 getState() == TransportState::CLOSED ||
288 error == boost::asio::error::operation_aborted || // when cancel() is called
289 error == boost::asio::error::shut_down) // after shutdown() is called
290 // transport is shutting down, ignore any errors
291 return;
292
Weiwei Liudcdf6212016-08-31 14:34:22 -0700293 handleError(error);
294}
295
296template<class T>
297void
298StreamTransport<T>::handleError(const boost::system::error_code& error)
299{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400300 if (error == boost::asio::error::eof) {
301 this->setState(TransportState::CLOSING);
302 }
303 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400304 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400305 this->setState(TransportState::FAILED);
306 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700307 doClose();
308}
309
Weiwei Liudcdf6212016-08-31 14:34:22 -0700310template<class T>
311void
312StreamTransport<T>::resetReceiveBuffer()
313{
314 m_receiveBufferSize = 0;
315}
316
317template<class T>
318void
319StreamTransport<T>::resetSendQueue()
320{
321 std::queue<Block> emptyQueue;
322 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700323 m_sendQueueBytes = 0;
324}
325
326template<class T>
327size_t
328StreamTransport<T>::getSendQueueBytes() const
329{
330 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700331}
332
Davide Pesaventoe422f9e2022-06-03 01:30:23 -0400333} // namespace nfd::face
Yukai Tu74c895d2015-09-21 01:11:51 -0700334
335#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP