blob: 448b28b3711ce97350f4aebcc5b0a687072d746d [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
Davide Pesavento5d642632023-10-03 00:36:08 -04003 * Copyright (c) 2014-2023, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Davide Pesavento2cae8ca2019-04-18 20:48:05 -040031#include "common/global.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070032
33#include <queue>
34
Davide Pesavento5d642632023-10-03 00:36:08 -040035#include <boost/asio/defer.hpp>
Davide Pesaventoa9b09b62022-06-04 14:07:25 -040036#include <boost/asio/write.hpp>
37
Davide Pesaventoe422f9e2022-06-03 01:30:23 -040038namespace nfd::face {
Yukai Tu74c895d2015-09-21 01:11:51 -070039
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040040/**
41 * \brief Implements a Transport for stream-based protocols.
Yukai Tu74c895d2015-09-21 01:11:51 -070042 *
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040043 * \tparam Protocol a stream-based protocol in Boost.Asio
Yukai Tu74c895d2015-09-21 01:11:51 -070044 */
45template<class Protocol>
46class StreamTransport : public Transport
47{
48public:
Davide Pesaventoa599d2a2022-02-16 18:52:43 -050049 using protocol = Protocol;
Yukai Tu74c895d2015-09-21 01:11:51 -070050
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040051 /**
52 * \brief Construct stream transport.
Yukai Tu74c895d2015-09-21 01:11:51 -070053 *
Davide Pesaventoc0df94e2023-10-08 19:26:55 -040054 * \param socket Protocol-specific socket for the created transport
Yukai Tu74c895d2015-09-21 01:11:51 -070055 */
56 explicit
57 StreamTransport(typename protocol::socket&& socket);
58
Eric Newberryb49313d2017-12-24 20:22:27 -070059 ssize_t
60 getSendQueueLength() override;
61
Davide Pesavento8728a252015-11-06 04:01:22 +010062protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040063 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020064 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070065
Yukai Tu74c895d2015-09-21 01:11:51 -070066 void
67 deferredClose();
68
Davide Pesavento1816d4b2017-07-02 12:20:48 -040069 void
Teng Liang13d582a2020-07-21 20:23:11 -070070 doSend(const Block& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010071
Yukai Tu74c895d2015-09-21 01:11:51 -070072 void
73 sendFromQueue();
74
75 void
76 handleSend(const boost::system::error_code& error,
77 size_t nBytesSent);
78
79 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070080 startReceive();
81
82 void
Yukai Tu74c895d2015-09-21 01:11:51 -070083 handleReceive(const boost::system::error_code& error,
84 size_t nBytesReceived);
85
86 void
87 processErrorCode(const boost::system::error_code& error);
88
Weiwei Liudcdf6212016-08-31 14:34:22 -070089 virtual void
90 handleError(const boost::system::error_code& error);
91
92 void
93 resetReceiveBuffer();
94
95 void
96 resetSendQueue();
97
Eric Newberryb49313d2017-12-24 20:22:27 -070098 size_t
99 getSendQueueBytes() const;
100
Yukai Tu74c895d2015-09-21 01:11:51 -0700101protected:
102 typename protocol::socket m_socket;
103
Davide Pesaventoa3148082018-04-12 18:21:54 -0400104 NFD_LOG_MEMBER_DECL();
Yukai Tu74c895d2015-09-21 01:11:51 -0700105
106private:
Davide Pesavento8728a252015-11-06 04:01:22 +0100107 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
108 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -0700109 std::queue<Block> m_sendQueue;
Eric Newberryb49313d2017-12-24 20:22:27 -0700110 size_t m_sendQueueBytes;
Yukai Tu74c895d2015-09-21 01:11:51 -0700111};
112
Yukai Tu74c895d2015-09-21 01:11:51 -0700113
114template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700115StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
116 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100117 , m_receiveBufferSize(0)
Eric Newberryb49313d2017-12-24 20:22:27 -0700118 , m_sendQueueBytes(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700119{
Eric Newberryb49313d2017-12-24 20:22:27 -0700120 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
121 // Therefore, protecting against send queue overflows is less critical than in other transport
122 // types. Instead, we use the default threshold specified in the GenericLinkService options.
123
Weiwei Liudcdf6212016-08-31 14:34:22 -0700124 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700125}
126
127template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700128ssize_t
129StreamTransport<T>::getSendQueueLength()
130{
131 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
132 if (queueLength == QUEUE_ERROR) {
133 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
134 }
135 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
136}
137
138template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100139void
Yukai Tu74c895d2015-09-21 01:11:51 -0700140StreamTransport<T>::doClose()
141{
142 NFD_LOG_FACE_TRACE(__func__);
143
144 if (m_socket.is_open()) {
145 // Cancel all outstanding operations and shutdown the socket
146 // so that no further sends or receives are possible.
147 // Use the non-throwing variants and ignore errors, if any.
148 boost::system::error_code error;
149 m_socket.cancel(error);
Davide Pesaventod91fe6d2023-10-04 21:40:02 -0400150 m_socket.shutdown(boost::asio::socket_base::shutdown_both, error);
Yukai Tu74c895d2015-09-21 01:11:51 -0700151 }
152
153 // Ensure that the Transport stays alive at least until
154 // all pending handlers are dispatched
Davide Pesavento5d642632023-10-03 00:36:08 -0400155 boost::asio::defer(getGlobalIoService(), [this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700156
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400157 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700158 //
159 // When doClose is called from a socket event handler (e.g., from handleReceive),
160 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
161 // Instead, handleSend is invoked as nothing bad happened.
162 //
163 // In order to prevent the assertion in handleSend from failing, we clear the queue
164 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
165 // this point have been executed. If more send operations are scheduled after this
166 // point, they will fail because the socket has been shutdown, and their callbacks
167 // will be invoked with error code == asio::error::shut_down.
168}
169
170template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100171void
Yukai Tu74c895d2015-09-21 01:11:51 -0700172StreamTransport<T>::deferredClose()
173{
174 NFD_LOG_FACE_TRACE(__func__);
175
Weiwei Liudcdf6212016-08-31 14:34:22 -0700176 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700177
178 // use the non-throwing variant and ignore errors, if any
179 boost::system::error_code error;
180 m_socket.close(error);
181
182 this->setState(TransportState::CLOSED);
183}
184
185template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100186void
Teng Liang13d582a2020-07-21 20:23:11 -0700187StreamTransport<T>::doSend(const Block& packet)
Davide Pesavento8728a252015-11-06 04:01:22 +0100188{
189 NFD_LOG_FACE_TRACE(__func__);
190
Weiwei Liudcdf6212016-08-31 14:34:22 -0700191 if (getState() != TransportState::UP)
192 return;
193
Davide Pesavento8728a252015-11-06 04:01:22 +0100194 bool wasQueueEmpty = m_sendQueue.empty();
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400195 m_sendQueue.push(packet);
196 m_sendQueueBytes += packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100197
198 if (wasQueueEmpty)
199 sendFromQueue();
200}
201
202template<class T>
203void
Yukai Tu74c895d2015-09-21 01:11:51 -0700204StreamTransport<T>::sendFromQueue()
205{
206 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400207 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700208}
209
210template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100211void
Yukai Tu74c895d2015-09-21 01:11:51 -0700212StreamTransport<T>::handleSend(const boost::system::error_code& error,
213 size_t nBytesSent)
214{
215 if (error)
216 return processErrorCode(error);
217
218 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
219
220 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700221 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
222 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700223 m_sendQueue.pop();
224
225 if (!m_sendQueue.empty())
226 sendFromQueue();
227}
228
229template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100230void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700231StreamTransport<T>::startReceive()
232{
233 BOOST_ASSERT(getState() == TransportState::UP);
234
235 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
236 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400237 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
Weiwei Liudcdf6212016-08-31 14:34:22 -0700238}
239
240template<class T>
241void
Yukai Tu74c895d2015-09-21 01:11:51 -0700242StreamTransport<T>::handleReceive(const boost::system::error_code& error,
243 size_t nBytesReceived)
244{
245 if (error)
246 return processErrorCode(error);
247
248 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
249
Davide Pesavento8728a252015-11-06 04:01:22 +0100250 m_receiveBufferSize += nBytesReceived;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500251 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700252 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700253 bool isOk = true;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500254 while (offset < bufferView.size()) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400255 Block element;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500256 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Yukai Tu74c895d2015-09-21 01:11:51 -0700257 if (!isOk)
258 break;
259
260 offset += element.size();
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500261 BOOST_ASSERT(offset <= bufferView.size());
Yukai Tu74c895d2015-09-21 01:11:51 -0700262
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400263 this->receive(element);
Yukai Tu74c895d2015-09-21 01:11:51 -0700264 }
265
Davide Pesavento8728a252015-11-06 04:01:22 +0100266 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400267 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700268 this->setState(TransportState::FAILED);
269 doClose();
270 return;
271 }
272
273 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100274 if (offset != m_receiveBufferSize) {
275 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
276 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700277 }
278 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100279 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700280 }
281 }
282
Weiwei Liudcdf6212016-08-31 14:34:22 -0700283 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700284}
285
286template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100287void
Yukai Tu74c895d2015-09-21 01:11:51 -0700288StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
289{
290 NFD_LOG_FACE_TRACE(__func__);
291
292 if (getState() == TransportState::CLOSING ||
293 getState() == TransportState::FAILED ||
294 getState() == TransportState::CLOSED ||
295 error == boost::asio::error::operation_aborted || // when cancel() is called
296 error == boost::asio::error::shut_down) // after shutdown() is called
297 // transport is shutting down, ignore any errors
298 return;
299
Weiwei Liudcdf6212016-08-31 14:34:22 -0700300 handleError(error);
301}
302
303template<class T>
304void
305StreamTransport<T>::handleError(const boost::system::error_code& error)
306{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400307 if (error == boost::asio::error::eof) {
308 this->setState(TransportState::CLOSING);
309 }
310 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400311 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400312 this->setState(TransportState::FAILED);
313 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700314 doClose();
315}
316
Weiwei Liudcdf6212016-08-31 14:34:22 -0700317template<class T>
318void
319StreamTransport<T>::resetReceiveBuffer()
320{
321 m_receiveBufferSize = 0;
322}
323
324template<class T>
325void
326StreamTransport<T>::resetSendQueue()
327{
328 std::queue<Block> emptyQueue;
329 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700330 m_sendQueueBytes = 0;
331}
332
333template<class T>
334size_t
335StreamTransport<T>::getSendQueueBytes() const
336{
337 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700338}
339
Davide Pesaventoe422f9e2022-06-03 01:30:23 -0400340} // namespace nfd::face
Yukai Tu74c895d2015-09-21 01:11:51 -0700341
342#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP