blob: e9ec78df9917e43aa64d4c9962f90deb8b5d3360 [file] [log] [blame]
Yukai Tu74c895d2015-09-21 01:11:51 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014-2015, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28
29#include "transport.hpp"
30#include "core/global-io.hpp"
31
32#include <queue>
33
34namespace nfd {
35namespace face {
36
37/** \brief Implements Transport for stream-based protocols.
38 *
39 * \tparam Protocol a stream-based protocol in Boost.Asio
40 */
41template<class Protocol>
42class StreamTransport : public Transport
43{
44public:
45 typedef Protocol protocol;
46
47 /** \brief Construct stream transport.
48 *
49 * \param socket Protocol-specific socket for the created transport
50 */
51 explicit
52 StreamTransport(typename protocol::socket&& socket);
53
54 virtual void
55 doSend(Transport::Packet&& packet) DECL_OVERRIDE;
56
57 virtual void
58 doClose() DECL_OVERRIDE;
59
60protected:
61 void
62 deferredClose();
63
64 void
65 sendFromQueue();
66
67 void
68 handleSend(const boost::system::error_code& error,
69 size_t nBytesSent);
70
71 void
72 handleReceive(const boost::system::error_code& error,
73 size_t nBytesReceived);
74
75 void
76 processErrorCode(const boost::system::error_code& error);
77
78protected:
79 typename protocol::socket m_socket;
80
81 NFD_LOG_INCLASS_DECLARE();
82
83private:
84 uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
85 size_t m_inputBufferSize;
86 std::queue<Block> m_sendQueue;
87};
88
89// All derived classes must use
90// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, <specialization-parameter>, "Name");
91
92
93template<class T>
94inline
95StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
96 : m_socket(std::move(socket))
97 , m_inputBufferSize(0)
98{
99 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE),
100 bind(&StreamTransport<T>::handleReceive, this,
101 boost::asio::placeholders::error,
102 boost::asio::placeholders::bytes_transferred));
103}
104
105template<class T>
106inline void
107StreamTransport<T>::doSend(Transport::Packet&& packet)
108{
109 NFD_LOG_FACE_TRACE(__func__);
110
111 bool wasQueueEmpty = m_sendQueue.empty();
112 m_sendQueue.push(packet.packet);
113
114 if (wasQueueEmpty)
115 sendFromQueue();
116}
117
118template<class T>
119inline void
120StreamTransport<T>::doClose()
121{
122 NFD_LOG_FACE_TRACE(__func__);
123
124 if (m_socket.is_open()) {
125 // Cancel all outstanding operations and shutdown the socket
126 // so that no further sends or receives are possible.
127 // Use the non-throwing variants and ignore errors, if any.
128 boost::system::error_code error;
129 m_socket.cancel(error);
130 m_socket.shutdown(protocol::socket::shutdown_both, error);
131 }
132
133 // Ensure that the Transport stays alive at least until
134 // all pending handlers are dispatched
135 getGlobalIoService().post(bind(&StreamTransport<T>::deferredClose, this));
136
137 // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
138 //
139 // When doClose is called from a socket event handler (e.g., from handleReceive),
140 // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
141 // Instead, handleSend is invoked as nothing bad happened.
142 //
143 // In order to prevent the assertion in handleSend from failing, we clear the queue
144 // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
145 // this point have been executed. If more send operations are scheduled after this
146 // point, they will fail because the socket has been shutdown, and their callbacks
147 // will be invoked with error code == asio::error::shut_down.
148}
149
150template<class T>
151inline void
152StreamTransport<T>::deferredClose()
153{
154 NFD_LOG_FACE_TRACE(__func__);
155
156 // clear send queue
157 std::queue<Block> emptyQueue;
158 std::swap(emptyQueue, m_sendQueue);
159
160 // use the non-throwing variant and ignore errors, if any
161 boost::system::error_code error;
162 m_socket.close(error);
163
164 this->setState(TransportState::CLOSED);
165}
166
167template<class T>
168inline void
169StreamTransport<T>::sendFromQueue()
170{
171 boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
172 bind(&StreamTransport<T>::handleSend, this,
173 boost::asio::placeholders::error,
174 boost::asio::placeholders::bytes_transferred));
175}
176
177template<class T>
178inline void
179StreamTransport<T>::handleSend(const boost::system::error_code& error,
180 size_t nBytesSent)
181{
182 if (error)
183 return processErrorCode(error);
184
185 NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
186
187 BOOST_ASSERT(!m_sendQueue.empty());
188 m_sendQueue.pop();
189
190 if (!m_sendQueue.empty())
191 sendFromQueue();
192}
193
194template<class T>
195inline void
196StreamTransport<T>::handleReceive(const boost::system::error_code& error,
197 size_t nBytesReceived)
198{
199 if (error)
200 return processErrorCode(error);
201
202 NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
203
204 m_inputBufferSize += nBytesReceived;
205
206 size_t offset = 0;
207
208 bool isOk = true;
209 Block element;
210 while (m_inputBufferSize - offset > 0) {
211 std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
212 if (!isOk)
213 break;
214
215 offset += element.size();
216
217 BOOST_ASSERT(offset <= m_inputBufferSize);
218
219 Transport::Packet packet(std::move(element));
220 this->receive(std::move(packet));
221 }
222
223 if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
224 NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
225 this->setState(TransportState::FAILED);
226 doClose();
227 return;
228 }
229
230 if (offset > 0) {
231 if (offset != m_inputBufferSize) {
232 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
233 m_inputBufferSize -= offset;
234 }
235 else {
236 m_inputBufferSize = 0;
237 }
238 }
239
240 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
241 ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize),
242 bind(&StreamTransport<T>::handleReceive, this,
243 boost::asio::placeholders::error,
244 boost::asio::placeholders::bytes_transferred));
245}
246
247template<class T>
248inline void
249StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
250{
251 NFD_LOG_FACE_TRACE(__func__);
252
253 if (getState() == TransportState::CLOSING ||
254 getState() == TransportState::FAILED ||
255 getState() == TransportState::CLOSED ||
256 error == boost::asio::error::operation_aborted || // when cancel() is called
257 error == boost::asio::error::shut_down) // after shutdown() is called
258 // transport is shutting down, ignore any errors
259 return;
260
261 if (error != boost::asio::error::eof)
262 NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
263
264 this->setState(TransportState::FAILED);
265 doClose();
266}
267
268} // namespace face
269} // namespace nfd
270
271#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP