blob: 8eb7af0717f54c9984a632db100bd91206a190f7 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Junxiao Shi08d07a72014-06-09 23:17:57 -07003 * Copyright (c) 2014, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis
Alexander Afanasyev9bcbc7c2014-04-06 19:37:37 -070010 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Junxiao Shi5dd26c32014-07-20 23:15:14 -070024 */
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080025
Alexander Afanasyev613e2a92014-04-15 13:36:58 -070026#ifndef NFD_DAEMON_FACE_STREAM_FACE_HPP
27#define NFD_DAEMON_FACE_STREAM_FACE_HPP
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080028
29#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080030#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060031#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080032
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080033namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080034
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080035// forward declaration
36template<class T, class U, class V> struct StreamFaceSenderImpl;
37
38template<class Protocol, class FaceBase = Face>
39class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080040{
41public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080042 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080043
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080044 /**
45 * \brief Create instance of StreamFace
46 */
Junxiao Shi79494162014-04-02 18:25:11 -070047 StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070048 const shared_ptr<typename protocol::socket>& socket,
49 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080051 virtual
52 ~StreamFace();
53
54 // from Face
55 virtual void
56 sendInterest(const Interest& interest);
57
58 virtual void
59 sendData(const Data& data);
60
61 virtual void
62 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080063
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080064protected:
65 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080066 processErrorCode(const boost::system::error_code& error);
67
68 void
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070069 sendFromQueue();
70
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080071 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080072 handleSend(const boost::system::error_code& error,
Junxiao Shi5dd26c32014-07-20 23:15:14 -070073 size_t nBytesSent);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080074
75 void
76 handleReceive(const boost::system::error_code& error,
Junxiao Shi5dd26c32014-07-20 23:15:14 -070077 size_t nBytesReceived);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080078
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080079 void
80 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010081
82 void
83 closeSocket();
84
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080085protected:
86 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010087
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080088private:
89 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
Junxiao Shi5dd26c32014-07-20 23:15:14 -070090 size_t m_inputBufferSize;
Alexander Afanasyeve5966b72014-07-20 23:39:50 -070091 std::queue<Block> m_sendQueue;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080092
93 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
94 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
95
Alexander Afanasyev3958b012014-01-31 15:06:13 -080096 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080097};
98
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080099// All inherited classes must use
100// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800101
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800102
103/** \brief Class allowing validation of the StreamFace use
104 *
105 * For example, partial specialization based on boost::asio::ip::tcp should check
106 * that local endpoint is loopback
107 *
108 * @throws Face::Error if validation failed
109 */
110template<class Protocol, class U>
111struct StreamFaceValidator
112{
113 static void
114 validateSocket(typename Protocol::socket& socket)
115 {
116 }
117};
118
119
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000120template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800121inline
Junxiao Shi79494162014-04-02 18:25:11 -0700122StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
123 const shared_ptr<typename StreamFace::protocol::socket>& socket,
124 bool isOnDemand)
125 : FaceBase(remoteUri, localUri)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000126 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800127 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800128{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700129 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000130 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800131 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000132 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800133}
134
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800135template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800136inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800137StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800138{
139}
140
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800141template<class Protocol, class FaceBase, class Packet>
142struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800143{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800144 static void
145 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
146 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700147 bool wasQueueEmpty = face.m_sendQueue.empty();
148 face.m_sendQueue.push(packet.wireEncode());
149
150 if (wasQueueEmpty)
151 face.sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800152 }
153};
154
155// partial specialization (only classes can be partially specialized)
156template<class Protocol, class Packet>
157struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
158{
159 static void
160 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
161 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700162 bool wasQueueEmpty = face.m_sendQueue.empty();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800163
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700164 if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800165 {
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700166 face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800167 }
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700168 face.m_sendQueue.push(packet.wireEncode());
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800169
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700170 if (wasQueueEmpty)
171 face.sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800172 }
173};
174
175
176template<class T, class U>
177inline void
178StreamFace<T, U>::sendInterest(const Interest& interest)
179{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000180 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182}
183
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800184template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800185inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800186StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800187{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000188 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800189 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800190}
191
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800192template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800193inline void
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700194StreamFace<T, U>::sendFromQueue()
195{
196 const Block& block = this->m_sendQueue.front();
197 boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
198 bind(&StreamFace<T, U>::handleSend, this, _1, _2));
199}
200
201template<class T, class U>
202inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800203StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800204{
205 if (!m_socket->is_open())
206 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800207
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800208 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700209 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800210 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800211
Davide Pesaventoba558e72014-02-17 18:38:19 +0100212 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700213 this->fail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800214}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800215
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800216template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800217inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800218StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800219{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800220 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800221 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800222
223 if (!m_socket->is_open())
224 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700225 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800226 return;
227 }
228
229 if (error == boost::asio::error::eof)
230 {
231 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700232 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800233 << "] Connection closed");
234 }
235 else
236 {
237 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700238 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800239 << "] Send or receive operation failed, closing socket: "
240 << error.category().message(error.value()));
241 }
242
243 closeSocket();
244
245 if (error == boost::asio::error::eof)
246 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700247 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800248 }
249 else
250 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700251 this->fail("Send or receive operation failed, closing socket: " +
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800252 error.category().message(error.value()));
253 }
254}
255
256
257template<class T, class U>
258inline void
259StreamFace<T, U>::handleSend(const boost::system::error_code& error,
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700260 size_t nBytesSent)
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800261{
262 if (error)
263 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800264
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700265 BOOST_ASSERT(!m_sendQueue.empty());
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800266
267 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700268 << ",uri:" << this->getRemoteUri()
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700269 << "] Successfully sent: " << nBytesSent << " bytes");
270 this->getMutableCounters().getNOutBytes() += nBytesSent;
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700271
272 m_sendQueue.pop();
273 if (!m_sendQueue.empty())
274 sendFromQueue();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800275}
276
277template<class T, class U>
278inline void
279StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700280 size_t nBytesReceived)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800281{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800282 if (error)
283 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800284
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800285 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700286 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700287 << "] Received: " << nBytesReceived << " bytes");
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700288 this->getMutableCounters().getNInBytes() += nBytesReceived;
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800289
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700290 m_inputBufferSize += nBytesReceived;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800291
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700292 size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800293
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700294 bool isOk = true;
295 Block element;
Junxiao Shi5dd26c32014-07-20 23:15:14 -0700296 while (m_inputBufferSize - offset > 0)
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700297 {
298 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
299 if (!isOk)
300 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800301
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700302 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100303
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700304 BOOST_ASSERT(offset <= m_inputBufferSize);
305
306 if (!this->decodeAndDispatchInput(element))
307 {
308 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700309 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700310 << "] Received unrecognized block of type ["
311 << element.type() << "]");
312 // ignore unknown packet and proceed
313 }
314 }
315 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
316 {
317 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700318 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700319 << "] Failed to parse incoming packet or it is too large to process, "
320 << "closing down the face");
321
322 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700323 this->fail("Failed to parse incoming packet or it is too large to process, "
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700324 "closing down the face");
325 return;
326 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800327
328 if (offset > 0)
329 {
330 if (offset != m_inputBufferSize)
331 {
332 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
333 m_inputBuffer);
334 m_inputBufferSize -= offset;
335 }
336 else
337 {
338 m_inputBufferSize = 0;
339 }
340 }
341
342 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
343 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800344 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800345}
346
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800347template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800348inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800349StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800350{
351}
352
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800353template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100354inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800355StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100356{
357 boost::asio::io_service& io = m_socket->get_io_service();
358
359 // use the non-throwing variants and ignore errors, if any
360 boost::system::error_code error;
361 m_socket->shutdown(protocol::socket::shutdown_both, error);
362 m_socket->close(error);
363 // after this, handlers will be called with an error code
364
365 // ensure that the Face object is alive at least until all pending
366 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800367 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100368 this, this->shared_from_this()));
Alexander Afanasyeve5966b72014-07-20 23:39:50 -0700369
370 // clear send queue
371 std::queue<Block> emptyQueue;
372 std::swap(emptyQueue, m_sendQueue);
Davide Pesaventoba558e72014-02-17 18:38:19 +0100373}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800374
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800375} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800376
Alexander Afanasyev613e2a92014-04-15 13:36:58 -0700377#endif // NFD_DAEMON_FACE_STREAM_FACE_HPP