blob: f9b7cd98c1bc6570318eea29bc35cac56fad992e [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Junxiao Shi08d07a72014-06-09 23:17:57 -07003 * Copyright (c) 2014, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis
Alexander Afanasyev9bcbc7c2014-04-06 19:37:37 -070010 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 **/
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080025
Alexander Afanasyev613e2a92014-04-15 13:36:58 -070026#ifndef NFD_DAEMON_FACE_STREAM_FACE_HPP
27#define NFD_DAEMON_FACE_STREAM_FACE_HPP
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080028
29#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080030#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060031#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080032
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080033namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080034
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080035// forward declaration
36template<class T, class U, class V> struct StreamFaceSenderImpl;
37
38template<class Protocol, class FaceBase = Face>
39class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080040{
41public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080042 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080043
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080044 /**
45 * \brief Create instance of StreamFace
46 */
Junxiao Shi79494162014-04-02 18:25:11 -070047 StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070048 const shared_ptr<typename protocol::socket>& socket,
49 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080051 virtual
52 ~StreamFace();
53
54 // from Face
55 virtual void
56 sendInterest(const Interest& interest);
57
58 virtual void
59 sendData(const Data& data);
60
61 virtual void
62 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080063
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080064protected:
65 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080066 processErrorCode(const boost::system::error_code& error);
67
68 void
69 handleSend(const boost::system::error_code& error,
70 const Block& header, const Block& payload);
71 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080072 handleSend(const boost::system::error_code& error,
73 const Block& wire);
74
75 void
76 handleReceive(const boost::system::error_code& error,
77 std::size_t bytes_recvd);
78
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080079 void
80 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010081
82 void
83 closeSocket();
84
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080085protected:
86 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010087
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080088private:
89 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
90 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080091
92 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
93 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
94
Alexander Afanasyev3958b012014-01-31 15:06:13 -080095 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080096};
97
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080098// All inherited classes must use
99// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800100
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800101
102/** \brief Class allowing validation of the StreamFace use
103 *
104 * For example, partial specialization based on boost::asio::ip::tcp should check
105 * that local endpoint is loopback
106 *
107 * @throws Face::Error if validation failed
108 */
109template<class Protocol, class U>
110struct StreamFaceValidator
111{
112 static void
113 validateSocket(typename Protocol::socket& socket)
114 {
115 }
116};
117
118
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000119template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800120inline
Junxiao Shi79494162014-04-02 18:25:11 -0700121StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
122 const shared_ptr<typename StreamFace::protocol::socket>& socket,
123 bool isOnDemand)
124 : FaceBase(remoteUri, localUri)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000125 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800126 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800127{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700128 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000129 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800130 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000131 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800132}
133
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800134template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800135inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800136StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800137{
138}
139
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800140template<class Protocol, class FaceBase, class Packet>
141struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800142{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800143 static void
144 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
145 {
146 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
147 packet.wireEncode().size()),
148 bind(&StreamFace<Protocol, FaceBase>::handleSend,
149 &face, _1, packet.wireEncode()));
150 }
151};
152
153// partial specialization (only classes can be partially specialized)
154template<class Protocol, class Packet>
155struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
156{
157 static void
158 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
159 {
160 using namespace boost::asio;
161
162 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
163 {
164 const Block& payload = packet.wireEncode();
165 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
166 bind(&StreamFace<Protocol, LocalFace>::handleSend,
167 &face, _1, packet.wireEncode()));
168 }
169 else
170 {
171 Block header = face.filterAndEncodeLocalControlHeader(packet);
172 const Block& payload = packet.wireEncode();
173
174 std::vector<const_buffer> buffers;
175 buffers.reserve(2);
176 buffers.push_back(buffer(header.wire(), header.size()));
177 buffers.push_back(buffer(payload.wire(), payload.size()));
178
179 face.m_socket->async_send(buffers,
180 bind(&StreamFace<Protocol, LocalFace>::handleSend,
181 &face, _1, header, payload));
182 }
183 }
184};
185
186
187template<class T, class U>
188inline void
189StreamFace<T, U>::sendInterest(const Interest& interest)
190{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000191 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800192 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800193}
194
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800195template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800196inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800197StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800198{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000199 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800200 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800201}
202
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800203template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800204inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800205StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800206{
207 if (!m_socket->is_open())
208 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800209
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800210 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700211 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800212 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800213
Davide Pesaventoba558e72014-02-17 18:38:19 +0100214 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700215 this->fail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800216}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800217
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800218template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800219inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800220StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800221{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800222 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800223 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800224
225 if (!m_socket->is_open())
226 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700227 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800228 return;
229 }
230
231 if (error == boost::asio::error::eof)
232 {
233 NFD_LOG_INFO("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700234 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800235 << "] Connection closed");
236 }
237 else
238 {
239 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700240 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800241 << "] Send or receive operation failed, closing socket: "
242 << error.category().message(error.value()));
243 }
244
245 closeSocket();
246
247 if (error == boost::asio::error::eof)
248 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700249 this->fail("Connection closed");
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800250 }
251 else
252 {
Junxiao Shi08d07a72014-06-09 23:17:57 -0700253 this->fail("Send or receive operation failed, closing socket: " +
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800254 error.category().message(error.value()));
255 }
256}
257
258
259template<class T, class U>
260inline void
261StreamFace<T, U>::handleSend(const boost::system::error_code& error,
262 const Block& wire)
263{
264 if (error)
265 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800266
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800267 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700268 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800269 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800270}
271
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800272template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800273inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800274StreamFace<T, U>::handleSend(const boost::system::error_code& error,
275 const Block& header, const Block& payload)
276{
277 if (error)
278 return processErrorCode(error);
279
280 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700281 << ",uri:" << this->getRemoteUri()
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800282 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
283}
284
285template<class T, class U>
286inline void
287StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800288 std::size_t bytes_recvd)
289{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800290 if (error)
291 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800292
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800293 NFD_LOG_TRACE("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700294 << ",uri:" << this->getRemoteUri()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800295 << "] Received: " << bytes_recvd << " bytes");
296
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800297 m_inputBufferSize += bytes_recvd;
298 // do magic
299
300 std::size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800301
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700302 bool isOk = true;
303 Block element;
304 while(m_inputBufferSize - offset > 0)
305 {
306 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
307 if (!isOk)
308 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800309
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700310 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100311
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700312 BOOST_ASSERT(offset <= m_inputBufferSize);
313
314 if (!this->decodeAndDispatchInput(element))
315 {
316 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700317 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700318 << "] Received unrecognized block of type ["
319 << element.type() << "]");
320 // ignore unknown packet and proceed
321 }
322 }
323 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
324 {
325 NFD_LOG_WARN("[id:" << this->getId()
Alexander Afanasyev29d1fab2014-07-07 19:27:16 -0700326 << ",uri:" << this->getRemoteUri()
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700327 << "] Failed to parse incoming packet or it is too large to process, "
328 << "closing down the face");
329
330 closeSocket();
Junxiao Shi08d07a72014-06-09 23:17:57 -0700331 this->fail("Failed to parse incoming packet or it is too large to process, "
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700332 "closing down the face");
333 return;
334 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800335
336 if (offset > 0)
337 {
338 if (offset != m_inputBufferSize)
339 {
340 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
341 m_inputBuffer);
342 m_inputBufferSize -= offset;
343 }
344 else
345 {
346 m_inputBufferSize = 0;
347 }
348 }
349
350 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
351 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800352 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800353}
354
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800355template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800356inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800357StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800358{
359}
360
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800361template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100362inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800363StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100364{
365 boost::asio::io_service& io = m_socket->get_io_service();
366
367 // use the non-throwing variants and ignore errors, if any
368 boost::system::error_code error;
369 m_socket->shutdown(protocol::socket::shutdown_both, error);
370 m_socket->close(error);
371 // after this, handlers will be called with an error code
372
373 // ensure that the Face object is alive at least until all pending
374 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800375 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100376 this, this->shared_from_this()));
377}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800378
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800379} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800380
Alexander Afanasyev613e2a92014-04-15 13:36:58 -0700381#endif // NFD_DAEMON_FACE_STREAM_FACE_HPP