blob: 0adbf26c482cdc770a6b95a3f89b6aa15299f04f [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Eric Newberryb49313d2017-12-24 20:22:27 -07002/*
3 * Copyright (c) 2014-2018, Regents of the University of California,
Yukai Tu74c895d2015-09-21 01:11:51 -07004 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
Eric Newberryb49313d2017-12-24 20:22:27 -070030#include "socket-utils.hpp"
Yukai Tu74c895d2015-09-21 01:11:51 -070031#include "core/global-io.hpp"
32
33#include <queue>
34
35namespace nfd {
36namespace face {
37
38/** \brief Implements Transport for stream-based protocols.
39 *
40 * \tparam Protocol a stream-based protocol in Boost.Asio
41 */
42template<class Protocol>
43class StreamTransport : public Transport
44{
45public:
46 typedef Protocol protocol;
47
48 /** \brief Construct stream transport.
49 *
50 * \param socket Protocol-specific socket for the created transport
51 */
52 explicit
53 StreamTransport(typename protocol::socket&& socket);
54
Eric Newberryb49313d2017-12-24 20:22:27 -070055 ssize_t
56 getSendQueueLength() override;
57
Davide Pesavento8728a252015-11-06 04:01:22 +010058protected:
Davide Pesavento1816d4b2017-07-02 12:20:48 -040059 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020060 doClose() override;
Yukai Tu74c895d2015-09-21 01:11:51 -070061
Yukai Tu74c895d2015-09-21 01:11:51 -070062 void
63 deferredClose();
64
Davide Pesavento1816d4b2017-07-02 12:20:48 -040065 void
Davide Pesaventob84bd3a2016-04-22 02:21:45 +020066 doSend(Transport::Packet&& packet) override;
Davide Pesavento8728a252015-11-06 04:01:22 +010067
Yukai Tu74c895d2015-09-21 01:11:51 -070068 void
69 sendFromQueue();
70
71 void
72 handleSend(const boost::system::error_code& error,
73 size_t nBytesSent);
74
75 void
Weiwei Liudcdf6212016-08-31 14:34:22 -070076 startReceive();
77
78 void
Yukai Tu74c895d2015-09-21 01:11:51 -070079 handleReceive(const boost::system::error_code& error,
80 size_t nBytesReceived);
81
82 void
83 processErrorCode(const boost::system::error_code& error);
84
Weiwei Liudcdf6212016-08-31 14:34:22 -070085 virtual void
86 handleError(const boost::system::error_code& error);
87
88 void
89 resetReceiveBuffer();
90
91 void
92 resetSendQueue();
93
Eric Newberryb49313d2017-12-24 20:22:27 -070094 size_t
95 getSendQueueBytes() const;
96
Yukai Tu74c895d2015-09-21 01:11:51 -070097protected:
98 typename protocol::socket m_socket;
99
100 NFD_LOG_INCLASS_DECLARE();
101
102private:
Davide Pesavento8728a252015-11-06 04:01:22 +0100103 uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
104 size_t m_receiveBufferSize;
Yukai Tu74c895d2015-09-21 01:11:51 -0700105 std::queue<Block> m_sendQueue;
Eric Newberryb49313d2017-12-24 20:22:27 -0700106 size_t m_sendQueueBytes;
Yukai Tu74c895d2015-09-21 01:11:51 -0700107};
108
Yukai Tu74c895d2015-09-21 01:11:51 -0700109
110template<class T>
Yukai Tu74c895d2015-09-21 01:11:51 -0700111StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
112 : m_socket(std::move(socket))
Davide Pesavento8728a252015-11-06 04:01:22 +0100113 , m_receiveBufferSize(0)
Eric Newberryb49313d2017-12-24 20:22:27 -0700114 , m_sendQueueBytes(0)
Yukai Tu74c895d2015-09-21 01:11:51 -0700115{
Eric Newberryb49313d2017-12-24 20:22:27 -0700116 // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
117 // Therefore, protecting against send queue overflows is less critical than in other transport
118 // types. Instead, we use the default threshold specified in the GenericLinkService options.
119
Weiwei Liudcdf6212016-08-31 14:34:22 -0700120 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700121}
122
123template<class T>
Eric Newberryb49313d2017-12-24 20:22:27 -0700124ssize_t
125StreamTransport<T>::getSendQueueLength()
126{
127 ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
128 if (queueLength == QUEUE_ERROR) {
129 NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
130 }
131 return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
132}
133
134template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100135void
Yukai Tu74c895d2015-09-21 01:11:51 -0700136StreamTransport<T>::doClose()
137{
138 NFD_LOG_FACE_TRACE(__func__);
139
140 if (m_socket.is_open()) {
141 // Cancel all outstanding operations and shutdown the socket
142 // so that no further sends or receives are possible.
143 // Use the non-throwing variants and ignore errors, if any.
144 boost::system::error_code error;
145 m_socket.cancel(error);
146 m_socket.shutdown(protocol::socket::shutdown_both, error);
147 }
148
149 // Ensure that the Transport stays alive at least until
150 // all pending handlers are dispatched
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400151 getGlobalIoService().post([this] { deferredClose(); });
Yukai Tu74c895d2015-09-21 01:11:51 -0700152
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400153 // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
Yukai Tu74c895d2015-09-21 01:11:51 -0700154 //
155 // When doClose is called from a socket event handler (e.g., from handleReceive),
156 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
157 // Instead, handleSend is invoked as nothing bad happened.
158 //
159 // In order to prevent the assertion in handleSend from failing, we clear the queue
160 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
161 // this point have been executed. If more send operations are scheduled after this
162 // point, they will fail because the socket has been shutdown, and their callbacks
163 // will be invoked with error code == asio::error::shut_down.
164}
165
166template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100167void
Yukai Tu74c895d2015-09-21 01:11:51 -0700168StreamTransport<T>::deferredClose()
169{
170 NFD_LOG_FACE_TRACE(__func__);
171
Weiwei Liudcdf6212016-08-31 14:34:22 -0700172 resetSendQueue();
Yukai Tu74c895d2015-09-21 01:11:51 -0700173
174 // use the non-throwing variant and ignore errors, if any
175 boost::system::error_code error;
176 m_socket.close(error);
177
178 this->setState(TransportState::CLOSED);
179}
180
181template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100182void
183StreamTransport<T>::doSend(Transport::Packet&& packet)
184{
185 NFD_LOG_FACE_TRACE(__func__);
186
Weiwei Liudcdf6212016-08-31 14:34:22 -0700187 if (getState() != TransportState::UP)
188 return;
189
Davide Pesavento8728a252015-11-06 04:01:22 +0100190 bool wasQueueEmpty = m_sendQueue.empty();
191 m_sendQueue.push(packet.packet);
Eric Newberryb49313d2017-12-24 20:22:27 -0700192 m_sendQueueBytes += packet.packet.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100193
194 if (wasQueueEmpty)
195 sendFromQueue();
196}
197
198template<class T>
199void
Yukai Tu74c895d2015-09-21 01:11:51 -0700200StreamTransport<T>::sendFromQueue()
201{
202 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
203 bind(&StreamTransport<T>::handleSend, this,
204 boost::asio::placeholders::error,
205 boost::asio::placeholders::bytes_transferred));
206}
207
208template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100209void
Yukai Tu74c895d2015-09-21 01:11:51 -0700210StreamTransport<T>::handleSend(const boost::system::error_code& error,
211 size_t nBytesSent)
212{
213 if (error)
214 return processErrorCode(error);
215
216 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
217
218 BOOST_ASSERT(!m_sendQueue.empty());
Eric Newberryb49313d2017-12-24 20:22:27 -0700219 BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
220 m_sendQueueBytes -= nBytesSent;
Yukai Tu74c895d2015-09-21 01:11:51 -0700221 m_sendQueue.pop();
222
223 if (!m_sendQueue.empty())
224 sendFromQueue();
225}
226
227template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100228void
Weiwei Liudcdf6212016-08-31 14:34:22 -0700229StreamTransport<T>::startReceive()
230{
231 BOOST_ASSERT(getState() == TransportState::UP);
232
233 m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
234 ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
235 bind(&StreamTransport<T>::handleReceive, this,
236 boost::asio::placeholders::error,
237 boost::asio::placeholders::bytes_transferred));
238}
239
240template<class T>
241void
Yukai Tu74c895d2015-09-21 01:11:51 -0700242StreamTransport<T>::handleReceive(const boost::system::error_code& error,
243 size_t nBytesReceived)
244{
245 if (error)
246 return processErrorCode(error);
247
248 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
249
Davide Pesavento8728a252015-11-06 04:01:22 +0100250 m_receiveBufferSize += nBytesReceived;
Yukai Tu74c895d2015-09-21 01:11:51 -0700251 size_t offset = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700252 bool isOk = true;
Davide Pesavento8728a252015-11-06 04:01:22 +0100253 while (m_receiveBufferSize - offset > 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400254 Block element;
Davide Pesavento8728a252015-11-06 04:01:22 +0100255 std::tie(isOk, element) = Block::fromBuffer(m_receiveBuffer + offset, m_receiveBufferSize - offset);
Yukai Tu74c895d2015-09-21 01:11:51 -0700256 if (!isOk)
257 break;
258
259 offset += element.size();
Davide Pesavento8728a252015-11-06 04:01:22 +0100260 BOOST_ASSERT(offset <= m_receiveBufferSize);
Yukai Tu74c895d2015-09-21 01:11:51 -0700261
Davide Pesavento8728a252015-11-06 04:01:22 +0100262 this->receive(Transport::Packet(std::move(element)));
Yukai Tu74c895d2015-09-21 01:11:51 -0700263 }
264
Davide Pesavento8728a252015-11-06 04:01:22 +0100265 if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400266 NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
Yukai Tu74c895d2015-09-21 01:11:51 -0700267 this->setState(TransportState::FAILED);
268 doClose();
269 return;
270 }
271
272 if (offset > 0) {
Davide Pesavento8728a252015-11-06 04:01:22 +0100273 if (offset != m_receiveBufferSize) {
274 std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
275 m_receiveBufferSize -= offset;
Yukai Tu74c895d2015-09-21 01:11:51 -0700276 }
277 else {
Davide Pesavento8728a252015-11-06 04:01:22 +0100278 m_receiveBufferSize = 0;
Yukai Tu74c895d2015-09-21 01:11:51 -0700279 }
280 }
281
Weiwei Liudcdf6212016-08-31 14:34:22 -0700282 startReceive();
Yukai Tu74c895d2015-09-21 01:11:51 -0700283}
284
285template<class T>
Davide Pesavento8728a252015-11-06 04:01:22 +0100286void
Yukai Tu74c895d2015-09-21 01:11:51 -0700287StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
288{
289 NFD_LOG_FACE_TRACE(__func__);
290
291 if (getState() == TransportState::CLOSING ||
292 getState() == TransportState::FAILED ||
293 getState() == TransportState::CLOSED ||
294 error == boost::asio::error::operation_aborted || // when cancel() is called
295 error == boost::asio::error::shut_down) // after shutdown() is called
296 // transport is shutting down, ignore any errors
297 return;
298
Weiwei Liudcdf6212016-08-31 14:34:22 -0700299 handleError(error);
300}
301
302template<class T>
303void
304StreamTransport<T>::handleError(const boost::system::error_code& error)
305{
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400306 if (error == boost::asio::error::eof) {
307 this->setState(TransportState::CLOSING);
308 }
309 else {
Davide Pesavento1816d4b2017-07-02 12:20:48 -0400310 NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
Davide Pesavento68ab43d2017-07-02 13:37:35 -0400311 this->setState(TransportState::FAILED);
312 }
Yukai Tu74c895d2015-09-21 01:11:51 -0700313 doClose();
314}
315
Weiwei Liudcdf6212016-08-31 14:34:22 -0700316template<class T>
317void
318StreamTransport<T>::resetReceiveBuffer()
319{
320 m_receiveBufferSize = 0;
321}
322
323template<class T>
324void
325StreamTransport<T>::resetSendQueue()
326{
327 std::queue<Block> emptyQueue;
328 std::swap(emptyQueue, m_sendQueue);
Eric Newberryb49313d2017-12-24 20:22:27 -0700329 m_sendQueueBytes = 0;
330}
331
332template<class T>
333size_t
334StreamTransport<T>::getSendQueueBytes() const
335{
336 return m_sendQueueBytes;
Weiwei Liudcdf6212016-08-31 14:34:22 -0700337}
338
Yukai Tu74c895d2015-09-21 01:11:51 -0700339} // namespace face
340} // namespace nfd
341
342#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP