blob: b6ad18b447ef4640f1adf1d3f9c1fa5cb0a5a0c3 [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
Davide Pesaventoa599d2a2022-02-16 18:52:43 -05003 * Copyright (c) 2014-2022, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Davide Pesavento2cae8ca2019-04-18 20:48:05 -040031#include "common/global.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070032
33#include <queue>
34
Davide Pesaventoa9b09b62022-06-04 14:07:25 -040035#include <boost/asio/write.hpp>
36
Davide Pesaventoe422f9e2022-06-03 01:30:23 -040037namespace nfd::face {
Yukai Tu74c895d2015-09-21 01:11:51 -070038
39/** \brief Implements Transport for stream-based protocols.
40 *
41 * \tparam Protocol a stream-based protocol in Boost.Asio
42 */
43template<class Protocol>
44class StreamTransport : public Transport
45{
46public:
Davide Pesaventoa599d2a2022-02-16 18:52:43 -050047 using protocol = Protocol;
Yukai Tu74c895d2015-09-21 01:11:51 -070048
49 /** \brief Construct stream transport.
50 *
51 * \param socket Protocol-specific socket for the created transport
52 */
53 explicit
54 StreamTransport(typename protocol::socket&& socket);
55
Eric Newberryb49313d2017-12-24 20:22:27 -070056 ssize_t
57 getSendQueueLength() override;
58
Davide Pesavento8728a252015-11-06 04:01:22 +010059protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040060 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020061 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070062
Yukai Tu74c895d2015-09-21 01:11:51 -070063 void
64 deferredClose();
65
Davide Pesavento1816d4b2017-07-02 12:20:48 -040066 void
Teng Liang13d582a2020-07-21 20:23:11 -070067 doSend(const Block& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010068
Yukai Tu74c895d2015-09-21 01:11:51 -070069 void
70 sendFromQueue();
71
72 void
73 handleSend(const boost::system::error_code& error,
74 size_t nBytesSent);
75
76 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070077 startReceive();
78
79 void
Yukai Tu74c895d2015-09-21 01:11:51 -070080 handleReceive(const boost::system::error_code& error,
81 size_t nBytesReceived);
82
83 void
84 processErrorCode(const boost::system::error_code& error);
85
Weiwei Liudcdf6212016-08-31 14:34:22 -070086 virtual void
87 handleError(const boost::system::error_code& error);
88
89 void
90 resetReceiveBuffer();
91
92 void
93 resetSendQueue();
94
Eric Newberryb49313d2017-12-24 20:22:27 -070095 size_t
96 getSendQueueBytes() const;
97
Yukai Tu74c895d2015-09-21 01:11:51 -070098protected:
99 typename protocol::socket m_socket;
100
Davide Pesaventoa3148082018-04-12 18:21:54 -0400101 NFD_LOG_MEMBER_DECL();
Yukai Tu74c895d2015-09-21 01:11:51 -0700102
103private:
Davide Pesavento8728a252015-11-06 04:01:22 +0100104 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
105 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -0700106 std::queue<Block> m_sendQueue;
Eric Newberryb49313d2017-12-24 20:22:27 -0700107 size_t m_sendQueueBytes;
Yukai Tu74c895d2015-09-21 01:11:51 -0700108};
109
Yukai Tu74c895d2015-09-21 01:11:51 -0700110
111template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700112StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
113 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100114 , m_receiveBufferSize(0)
Eric Newberryb49313d2017-12-24 20:22:27 -0700115 , m_sendQueueBytes(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700116{
Eric Newberryb49313d2017-12-24 20:22:27 -0700117 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
118 // Therefore, protecting against send queue overflows is less critical than in other transport
119 // types. Instead, we use the default threshold specified in the GenericLinkService options.
120
Weiwei Liudcdf6212016-08-31 14:34:22 -0700121 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700122}
123
124template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700125ssize_t
126StreamTransport<T>::getSendQueueLength()
127{
128 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
129 if (queueLength == QUEUE_ERROR) {
130 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
131 }
132 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
133}
134
135template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100136void
Yukai Tu74c895d2015-09-21 01:11:51 -0700137StreamTransport<T>::doClose()
138{
139 NFD_LOG_FACE_TRACE(__func__);
140
141 if (m_socket.is_open()) {
142 // Cancel all outstanding operations and shutdown the socket
143 // so that no further sends or receives are possible.
144 // Use the non-throwing variants and ignore errors, if any.
145 boost::system::error_code error;
146 m_socket.cancel(error);
147 m_socket.shutdown(protocol::socket::shutdown_both, error);
148 }
149
150 // Ensure that the Transport stays alive at least until
151 // all pending handlers are dispatched
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400152 getGlobalIoService().post([this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700153
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400154 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700155 //
156 // When doClose is called from a socket event handler (e.g., from handleReceive),
157 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
158 // Instead, handleSend is invoked as nothing bad happened.
159 //
160 // In order to prevent the assertion in handleSend from failing, we clear the queue
161 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
162 // this point have been executed. If more send operations are scheduled after this
163 // point, they will fail because the socket has been shutdown, and their callbacks
164 // will be invoked with error code == asio::error::shut_down.
165}
166
167template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100168void
Yukai Tu74c895d2015-09-21 01:11:51 -0700169StreamTransport<T>::deferredClose()
170{
171 NFD_LOG_FACE_TRACE(__func__);
172
Weiwei Liudcdf6212016-08-31 14:34:22 -0700173 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700174
175 // use the non-throwing variant and ignore errors, if any
176 boost::system::error_code error;
177 m_socket.close(error);
178
179 this->setState(TransportState::CLOSED);
180}
181
182template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100183void
Teng Liang13d582a2020-07-21 20:23:11 -0700184StreamTransport<T>::doSend(const Block& packet)
Davide Pesavento8728a252015-11-06 04:01:22 +0100185{
186 NFD_LOG_FACE_TRACE(__func__);
187
Weiwei Liudcdf6212016-08-31 14:34:22 -0700188 if (getState() != TransportState::UP)
189 return;
190
Davide Pesavento8728a252015-11-06 04:01:22 +0100191 bool wasQueueEmpty = m_sendQueue.empty();
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400192 m_sendQueue.push(packet);
193 m_sendQueueBytes += packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100194
195 if (wasQueueEmpty)
196 sendFromQueue();
197}
198
199template<class T>
200void
Yukai Tu74c895d2015-09-21 01:11:51 -0700201StreamTransport<T>::sendFromQueue()
202{
203 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400204 [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700205}
206
207template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100208void
Yukai Tu74c895d2015-09-21 01:11:51 -0700209StreamTransport<T>::handleSend(const boost::system::error_code& error,
210 size_t nBytesSent)
211{
212 if (error)
213 return processErrorCode(error);
214
215 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
216
217 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700218 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
219 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700220 m_sendQueue.pop();
221
222 if (!m_sendQueue.empty())
223 sendFromQueue();
224}
225
226template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100227void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700228StreamTransport<T>::startReceive()
229{
230 BOOST_ASSERT(getState() == TransportState::UP);
231
232 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
233 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
Davide Pesaventoe4b22382018-06-10 14:37:24 -0400234 [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
Weiwei Liudcdf6212016-08-31 14:34:22 -0700235}
236
237template<class T>
238void
Yukai Tu74c895d2015-09-21 01:11:51 -0700239StreamTransport<T>::handleReceive(const boost::system::error_code& error,
240 size_t nBytesReceived)
241{
242 if (error)
243 return processErrorCode(error);
244
245 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
246
Davide Pesavento8728a252015-11-06 04:01:22 +0100247 m_receiveBufferSize += nBytesReceived;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500248 auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700249 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700250 bool isOk = true;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500251 while (offset < bufferView.size()) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400252 Block element;
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500253 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Yukai Tu74c895d2015-09-21 01:11:51 -0700254 if (!isOk)
255 break;
256
257 offset += element.size();
Davide Pesaventoa599d2a2022-02-16 18:52:43 -0500258 BOOST_ASSERT(offset <= bufferView.size());
Yukai Tu74c895d2015-09-21 01:11:51 -0700259
Davide Pesaventob3a23ca2019-05-04 20:40:21 -0400260 this->receive(element);
Yukai Tu74c895d2015-09-21 01:11:51 -0700261 }
262
Davide Pesavento8728a252015-11-06 04:01:22 +0100263 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400264 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700265 this->setState(TransportState::FAILED);
266 doClose();
267 return;
268 }
269
270 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100271 if (offset != m_receiveBufferSize) {
272 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
273 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700274 }
275 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100276 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700277 }
278 }
279
Weiwei Liudcdf6212016-08-31 14:34:22 -0700280 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700281}
282
283template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100284void
Yukai Tu74c895d2015-09-21 01:11:51 -0700285StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
286{
287 NFD_LOG_FACE_TRACE(__func__);
288
289 if (getState() == TransportState::CLOSING ||
290 getState() == TransportState::FAILED ||
291 getState() == TransportState::CLOSED ||
292 error == boost::asio::error::operation_aborted || // when cancel() is called
293 error == boost::asio::error::shut_down) // after shutdown() is called
294 // transport is shutting down, ignore any errors
295 return;
296
Weiwei Liudcdf6212016-08-31 14:34:22 -0700297 handleError(error);
298}
299
300template<class T>
301void
302StreamTransport<T>::handleError(const boost::system::error_code& error)
303{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400304 if (error == boost::asio::error::eof) {
305 this->setState(TransportState::CLOSING);
306 }
307 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400308 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400309 this->setState(TransportState::FAILED);
310 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700311 doClose();
312}
313
Weiwei Liudcdf6212016-08-31 14:34:22 -0700314template<class T>
315void
316StreamTransport<T>::resetReceiveBuffer()
317{
318 m_receiveBufferSize = 0;
319}
320
321template<class T>
322void
323StreamTransport<T>::resetSendQueue()
324{
325 std::queue<Block> emptyQueue;
326 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700327 m_sendQueueBytes = 0;
328}
329
330template<class T>
331size_t
332StreamTransport<T>::getSendQueueBytes() const
333{
334 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700335}
336
Davide Pesaventoe422f9e2022-06-03 01:30:23 -0400337} // namespace nfd::face
Yukai Tu74c895d2015-09-21 01:11:51 -0700338
339#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP