blob: ef28c253bca9132ca991ee7c8781d741f6100485 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Junxiao Shi08d07a72014-06-09 23:17:57 -07003 * Copyright (c) 2014, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis
Alexander Afanasyev9bcbc7c2014-04-06 19:37:37 -070010 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 **/
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080025
Alexander Afanasyev613e2a92014-04-15 13:36:58 -070026#ifndef NFD_DAEMON_FACE_STREAM_FACE_HPP
27#define NFD_DAEMON_FACE_STREAM_FACE_HPP
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080028
29#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080030#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060031#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080032
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080033namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080034
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080035// forward declaration
36template<class T, class U, class V> struct StreamFaceSenderImpl;
37
38template<class Protocol, class FaceBase = Face>
39class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080040{
41public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080042 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080043
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080044 /**
45 * \brief Create instance of StreamFace
46 */
Junxiao Shi79494162014-04-02 18:25:11 -070047 StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070048 const shared_ptr<typename protocol::socket>& socket,
49 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080051 virtual
52 ~StreamFace();
53
54 // from Face
55 virtual void
56 sendInterest(const Interest& interest);
57
58 virtual void
59 sendData(const Data& data);
60
61 virtual void
62 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080063
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080064protected:
65 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080066 processErrorCode(const boost::system::error_code& error);
67
68 void
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070069 sendFromQueue();
70
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080071 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080072 handleSend(const boost::system::error_code& error,
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070073 std::size_t nBytesTransferred);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080074
75 void
76 handleReceive(const boost::system::error_code& error,
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070077 std::size_t nBytesReceived);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080078
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080079 void
80 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010081
82 void
83 closeSocket();
84
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080085protected:
86 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010087
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080088private:
89 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
90 std::size_t m_inputBufferSize;
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070091 std::queue<Block> m_sendQueue;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080092
93 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
94 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
95
Alexander Afanasyev3958b012014-01-31 15:06:13 -080096 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080097};
98
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080099// All inherited classes must use
100// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800101
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800102
103/** \brief Class allowing validation of the StreamFace use
104 *
105 * For example, partial specialization based on boost::asio::ip::tcp should check
106 * that local endpoint is loopback
107 *
108 * @throws Face::Error if validation failed
109 */
110template<class Protocol, class U>
111struct StreamFaceValidator
112{
113 static void
114 validateSocket(typename Protocol::socket& socket)
115 {
116 }
117};
118
119
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000120template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800121inline
Junxiao Shi79494162014-04-02 18:25:11 -0700122StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
123 const shared_ptr<typename StreamFace::protocol::socket>& socket,
124 bool isOnDemand)
125 : FaceBase(remoteUri, localUri)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000126 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800127 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800128{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700129 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000130 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800131 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000132 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800133}
134
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800135template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800136inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800137StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800138{
139}
140
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800141template<class Protocol, class FaceBase, class Packet>
142struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800143{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800144 static void
145 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
146 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700147 bool wasQueueEmpty = face.m_sendQueue.empty();
148 face.m_sendQueue.push(packet.wireEncode());
149
150 if (wasQueueEmpty)
151 face.sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800152 }
153};
154
155// partial specialization (only classes can be partially specialized)
156template<class Protocol, class Packet>
157struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
158{
159 static void
160 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
161 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700162 bool wasQueueEmpty = face.m_sendQueue.empty();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800163
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700164 if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800165 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700166 face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800167 }
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700168 face.m_sendQueue.push(packet.wireEncode());
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800169
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700170 if (wasQueueEmpty)
171 face.sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800172 }
173};
174
175
176template<class T, class U>
177inline void
178StreamFace<T, U>::sendInterest(const Interest& interest)
179{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000180 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182}
183
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800184template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800185inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800186StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800187{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000188 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800189 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800190}
191
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800192template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800193inline void
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700194StreamFace<T, U>::sendFromQueue()
195{
196 const Block& block = this->m_sendQueue.front();
197 boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
198 bind(&StreamFace<T, U>::handleSend, this, _1, _2));
199}
200
201template<class T, class U>
202inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800203StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800204{
205 if (!m_socket->is_open())
206 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800207
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800208 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700209 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800210 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800211
Davide Pesaventoba558e72014-02-17 18:38:19 +0100212 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700213 this->fail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800214}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800215
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800216template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800217inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800218StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800219{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800220 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800221 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800222
223 if (!m_socket->is_open())
224 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700225 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800226 return;
227 }
228
229 if (error == boost::asio::error::eof)
230 {
231 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700232 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800233 << "] Connection closed");
234 }
235 else
236 {
237 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700238 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800239 << "] Send or receive operation failed, closing socket: "
240 << error.category().message(error.value()));
241 }
242
243 closeSocket();
244
245 if (error == boost::asio::error::eof)
246 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700247 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800248 }
249 else
250 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700251 this->fail("Send or receive operation failed, closing socket: " +
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800252 error.category().message(error.value()));
253 }
254}
255
256
257template<class T, class U>
258inline void
259StreamFace<T, U>::handleSend(const boost::system::error_code& error,
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700260 size_t nBytesTransferred)
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800261{
262 if (error)
263 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800264
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700265 BOOST_ASSERT(!m_sendQueue.empty());
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800266
267 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700268 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700269 << "] Successfully sent: " << nBytesTransferred << " bytes");
270
271 m_sendQueue.pop();
272 if (!m_sendQueue.empty())
273 sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800274}
275
276template<class T, class U>
277inline void
278StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700279 std::size_t nBytesReceived)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800280{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800281 if (error)
282 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800283
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800284 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700285 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700286 << "] Received: " << nBytesReceived << " bytes");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800287
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700288 m_inputBufferSize += nBytesReceived;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800289 // do magic
290
291 std::size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800292
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700293 bool isOk = true;
294 Block element;
295 while(m_inputBufferSize - offset > 0)
296 {
297 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
298 if (!isOk)
299 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800300
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700301 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100302
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700303 BOOST_ASSERT(offset <= m_inputBufferSize);
304
305 if (!this->decodeAndDispatchInput(element))
306 {
307 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700308 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700309 << "] Received unrecognized block of type ["
310 << element.type() << "]");
311 // ignore unknown packet and proceed
312 }
313 }
314 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
315 {
316 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700317 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700318 << "] Failed to parse incoming packet or it is too large to process, "
319 << "closing down the face");
320
321 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700322 this->fail("Failed to parse incoming packet or it is too large to process, "
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700323 "closing down the face");
324 return;
325 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800326
327 if (offset > 0)
328 {
329 if (offset != m_inputBufferSize)
330 {
331 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
332 m_inputBuffer);
333 m_inputBufferSize -= offset;
334 }
335 else
336 {
337 m_inputBufferSize = 0;
338 }
339 }
340
341 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
342 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800343 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800344}
345
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800346template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800347inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800348StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800349{
350}
351
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800352template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100353inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800354StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100355{
356 boost::asio::io_service& io = m_socket->get_io_service();
357
358 // use the non-throwing variants and ignore errors, if any
359 boost::system::error_code error;
360 m_socket->shutdown(protocol::socket::shutdown_both, error);
361 m_socket->close(error);
362 // after this, handlers will be called with an error code
363
364 // ensure that the Face object is alive at least until all pending
365 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800366 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100367 this, this->shared_from_this()));
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700368
369 // clear send queue
370 std::queue<Block> emptyQueue;
371 std::swap(emptyQueue, m_sendQueue);
Davide Pesaventoba558e72014-02-17 18:38:19 +0100372}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800373
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800374} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800375
Alexander Afanasyev613e2a92014-04-15 13:36:58 -0700376#endif // NFD_DAEMON_FACE_STREAM_FACE_HPP