blob: 84ae5517552ddd6bcca76c7603f48580d082881b [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev9bcbc7c2014-04-06 19:37:37 -07003 * Copyright (c) 2014 Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology
9 *
10 * This file is part of NFD (Named Data Networking Forwarding Daemon).
11 * See AUTHORS.md for complete list of NFD authors and contributors.
12 *
13 * NFD is free software: you can redistribute it and/or modify it under the terms
14 * of the GNU General Public License as published by the Free Software Foundation,
15 * either version 3 of the License, or (at your option) any later version.
16 *
17 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
18 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
19 * PURPOSE. See the GNU General Public License for more details.
20 *
21 * You should have received a copy of the GNU General Public License along with
22 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
23 **/
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080024
25#ifndef NFD_FACE_STREAM_FACE_HPP
26#define NFD_FACE_STREAM_FACE_HPP
27
28#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080029#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060030#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080031
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080032namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080033
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080034// forward declaration
35template<class T, class U, class V> struct StreamFaceSenderImpl;
36
37template<class Protocol, class FaceBase = Face>
38class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080039{
40public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080041 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080042
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080043 /**
44 * \brief Create instance of StreamFace
45 */
Junxiao Shi79494162014-04-02 18:25:11 -070046 StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070047 const shared_ptr<typename protocol::socket>& socket,
48 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080049
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080050 virtual
51 ~StreamFace();
52
53 // from Face
54 virtual void
55 sendInterest(const Interest& interest);
56
57 virtual void
58 sendData(const Data& data);
59
60 virtual void
61 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080062
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080063protected:
64 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080065 processErrorCode(const boost::system::error_code& error);
66
67 void
68 handleSend(const boost::system::error_code& error,
69 const Block& header, const Block& payload);
70 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080071 handleSend(const boost::system::error_code& error,
72 const Block& wire);
73
74 void
75 handleReceive(const boost::system::error_code& error,
76 std::size_t bytes_recvd);
77
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080078 void
79 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010080
81 void
82 closeSocket();
83
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080084protected:
85 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010086
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080087private:
88 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
89 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080090
91 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
92 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
93
Alexander Afanasyev3958b012014-01-31 15:06:13 -080094 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080095};
96
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080097// All inherited classes must use
98// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080099
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800100
101/** \brief Class allowing validation of the StreamFace use
102 *
103 * For example, partial specialization based on boost::asio::ip::tcp should check
104 * that local endpoint is loopback
105 *
106 * @throws Face::Error if validation failed
107 */
108template<class Protocol, class U>
109struct StreamFaceValidator
110{
111 static void
112 validateSocket(typename Protocol::socket& socket)
113 {
114 }
115};
116
117
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000118template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800119inline
Junxiao Shi79494162014-04-02 18:25:11 -0700120StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
121 const shared_ptr<typename StreamFace::protocol::socket>& socket,
122 bool isOnDemand)
123 : FaceBase(remoteUri, localUri)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000124 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800125 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800126{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700127 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000128 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800129 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000130 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800131}
132
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800133template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800134inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800135StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800136{
137}
138
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800139template<class Protocol, class FaceBase, class Packet>
140struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800141{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800142 static void
143 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
144 {
145 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
146 packet.wireEncode().size()),
147 bind(&StreamFace<Protocol, FaceBase>::handleSend,
148 &face, _1, packet.wireEncode()));
149 }
150};
151
152// partial specialization (only classes can be partially specialized)
153template<class Protocol, class Packet>
154struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
155{
156 static void
157 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
158 {
159 using namespace boost::asio;
160
161 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
162 {
163 const Block& payload = packet.wireEncode();
164 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
165 bind(&StreamFace<Protocol, LocalFace>::handleSend,
166 &face, _1, packet.wireEncode()));
167 }
168 else
169 {
170 Block header = face.filterAndEncodeLocalControlHeader(packet);
171 const Block& payload = packet.wireEncode();
172
173 std::vector<const_buffer> buffers;
174 buffers.reserve(2);
175 buffers.push_back(buffer(header.wire(), header.size()));
176 buffers.push_back(buffer(payload.wire(), payload.size()));
177
178 face.m_socket->async_send(buffers,
179 bind(&StreamFace<Protocol, LocalFace>::handleSend,
180 &face, _1, header, payload));
181 }
182 }
183};
184
185
186template<class T, class U>
187inline void
188StreamFace<T, U>::sendInterest(const Interest& interest)
189{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000190 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800191 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800192}
193
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800194template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800195inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800197{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000198 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800199 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800200}
201
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800202template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800203inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800204StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800205{
206 if (!m_socket->is_open())
207 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800208
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800209 NFD_LOG_INFO("[id:" << this->getId()
210 << ",endpoint:" << m_socket->local_endpoint()
211 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800212
Davide Pesaventoba558e72014-02-17 18:38:19 +0100213 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800214 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800215}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800216
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800217template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800218inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800219StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800220{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800221 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800222 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800223
224 if (!m_socket->is_open())
225 {
226 this->onFail("Connection closed");
227 return;
228 }
229
230 if (error == boost::asio::error::eof)
231 {
232 NFD_LOG_INFO("[id:" << this->getId()
233 << ",endpoint:" << m_socket->local_endpoint()
234 << "] Connection closed");
235 }
236 else
237 {
238 NFD_LOG_WARN("[id:" << this->getId()
239 << ",endpoint:" << m_socket->local_endpoint()
240 << "] Send or receive operation failed, closing socket: "
241 << error.category().message(error.value()));
242 }
243
244 closeSocket();
245
246 if (error == boost::asio::error::eof)
247 {
248 this->onFail("Connection closed");
249 }
250 else
251 {
252 this->onFail("Send or receive operation failed, closing socket: " +
253 error.category().message(error.value()));
254 }
255}
256
257
258template<class T, class U>
259inline void
260StreamFace<T, U>::handleSend(const boost::system::error_code& error,
261 const Block& wire)
262{
263 if (error)
264 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800265
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800266 NFD_LOG_TRACE("[id:" << this->getId()
267 << ",endpoint:" << m_socket->local_endpoint()
268 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800269}
270
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800271template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800272inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800273StreamFace<T, U>::handleSend(const boost::system::error_code& error,
274 const Block& header, const Block& payload)
275{
276 if (error)
277 return processErrorCode(error);
278
279 NFD_LOG_TRACE("[id:" << this->getId()
280 << ",endpoint:" << m_socket->local_endpoint()
281 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
282}
283
284template<class T, class U>
285inline void
286StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800287 std::size_t bytes_recvd)
288{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800289 if (error)
290 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800291
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800292 NFD_LOG_TRACE("[id:" << this->getId()
293 << ",endpoint:" << m_socket->local_endpoint()
294 << "] Received: " << bytes_recvd << " bytes");
295
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800296 m_inputBufferSize += bytes_recvd;
297 // do magic
298
299 std::size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800300
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700301 bool isOk = true;
302 Block element;
303 while(m_inputBufferSize - offset > 0)
304 {
305 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
306 if (!isOk)
307 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800308
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700309 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100310
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700311 BOOST_ASSERT(offset <= m_inputBufferSize);
312
313 if (!this->decodeAndDispatchInput(element))
314 {
315 NFD_LOG_WARN("[id:" << this->getId()
316 << ",endpoint:" << m_socket->local_endpoint()
317 << "] Received unrecognized block of type ["
318 << element.type() << "]");
319 // ignore unknown packet and proceed
320 }
321 }
322 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
323 {
324 NFD_LOG_WARN("[id:" << this->getId()
325 << ",endpoint:" << m_socket->local_endpoint()
326 << "] Failed to parse incoming packet or it is too large to process, "
327 << "closing down the face");
328
329 closeSocket();
330 this->onFail("Failed to parse incoming packet or it is too large to process, "
331 "closing down the face");
332 return;
333 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800334
335 if (offset > 0)
336 {
337 if (offset != m_inputBufferSize)
338 {
339 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
340 m_inputBuffer);
341 m_inputBufferSize -= offset;
342 }
343 else
344 {
345 m_inputBufferSize = 0;
346 }
347 }
348
349 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
350 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800351 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800352}
353
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800354template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800355inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800356StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800357{
358}
359
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800360template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100361inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800362StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100363{
364 boost::asio::io_service& io = m_socket->get_io_service();
365
366 // use the non-throwing variants and ignore errors, if any
367 boost::system::error_code error;
368 m_socket->shutdown(protocol::socket::shutdown_both, error);
369 m_socket->close(error);
370 // after this, handlers will be called with an error code
371
372 // ensure that the Face object is alive at least until all pending
373 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800374 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100375 this, this->shared_from_this()));
376}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800377
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800378} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800379
380#endif // NFD_FACE_STREAM_FACE_HPP