blob: 1b5b1246ef975a97a792c7ee7791f59eb24f1432 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060012#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080013
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080014namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080015
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080016// forward declaration
17template<class T, class U, class V> struct StreamFaceSenderImpl;
18
19template<class Protocol, class FaceBase = Face>
20class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080021{
22public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080023 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080024
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080025 /**
26 * \brief Create instance of StreamFace
27 */
Davide Pesavento0ff10db2014-02-28 03:12:27 +010028 explicit
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000029 StreamFace(const FaceUri& uri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070030 const shared_ptr<typename protocol::socket>& socket,
31 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080032
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080033 virtual
34 ~StreamFace();
35
36 // from Face
37 virtual void
38 sendInterest(const Interest& interest);
39
40 virtual void
41 sendData(const Data& data);
42
43 virtual void
44 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080045
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080046protected:
47 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080048 processErrorCode(const boost::system::error_code& error);
49
50 void
51 handleSend(const boost::system::error_code& error,
52 const Block& header, const Block& payload);
53 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080054 handleSend(const boost::system::error_code& error,
55 const Block& wire);
56
57 void
58 handleReceive(const boost::system::error_code& error,
59 std::size_t bytes_recvd);
60
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080061 void
62 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010063
64 void
65 closeSocket();
66
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080067protected:
68 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010069
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080070private:
71 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
72 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080073
74 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
75 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
76
Alexander Afanasyev3958b012014-01-31 15:06:13 -080077 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080078};
79
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080080// All inherited classes must use
81// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080082
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080083
84/** \brief Class allowing validation of the StreamFace use
85 *
86 * For example, partial specialization based on boost::asio::ip::tcp should check
87 * that local endpoint is loopback
88 *
89 * @throws Face::Error if validation failed
90 */
91template<class Protocol, class U>
92struct StreamFaceValidator
93{
94 static void
95 validateSocket(typename Protocol::socket& socket)
96 {
97 }
98};
99
100
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000101template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800102inline
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000103StreamFace<T, FaceBase>::StreamFace(const FaceUri& uri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700104 const shared_ptr<typename StreamFace::protocol::socket>& socket,
105 bool isOnDemand)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000106 : FaceBase(uri)
107 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800108 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800109{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700110 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000111 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800112 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000113 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800114}
115
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800116template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800117inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800118StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800119{
120}
121
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800122template<class Protocol, class FaceBase, class Packet>
123struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800124{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800125 static void
126 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
127 {
128 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
129 packet.wireEncode().size()),
130 bind(&StreamFace<Protocol, FaceBase>::handleSend,
131 &face, _1, packet.wireEncode()));
132 }
133};
134
135// partial specialization (only classes can be partially specialized)
136template<class Protocol, class Packet>
137struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
138{
139 static void
140 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
141 {
142 using namespace boost::asio;
143
144 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
145 {
146 const Block& payload = packet.wireEncode();
147 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
148 bind(&StreamFace<Protocol, LocalFace>::handleSend,
149 &face, _1, packet.wireEncode()));
150 }
151 else
152 {
153 Block header = face.filterAndEncodeLocalControlHeader(packet);
154 const Block& payload = packet.wireEncode();
155
156 std::vector<const_buffer> buffers;
157 buffers.reserve(2);
158 buffers.push_back(buffer(header.wire(), header.size()));
159 buffers.push_back(buffer(payload.wire(), payload.size()));
160
161 face.m_socket->async_send(buffers,
162 bind(&StreamFace<Protocol, LocalFace>::handleSend,
163 &face, _1, header, payload));
164 }
165 }
166};
167
168
169template<class T, class U>
170inline void
171StreamFace<T, U>::sendInterest(const Interest& interest)
172{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000173 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800174 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800175}
176
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800177template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800178inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800179StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800180{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000181 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800182 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800183}
184
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800185template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800186inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800187StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800188{
189 if (!m_socket->is_open())
190 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800191
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800192 NFD_LOG_INFO("[id:" << this->getId()
193 << ",endpoint:" << m_socket->local_endpoint()
194 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800195
Davide Pesaventoba558e72014-02-17 18:38:19 +0100196 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800197 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800198}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800199
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800200template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800201inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800202StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800203{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800204 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800205 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800206
207 if (!m_socket->is_open())
208 {
209 this->onFail("Connection closed");
210 return;
211 }
212
213 if (error == boost::asio::error::eof)
214 {
215 NFD_LOG_INFO("[id:" << this->getId()
216 << ",endpoint:" << m_socket->local_endpoint()
217 << "] Connection closed");
218 }
219 else
220 {
221 NFD_LOG_WARN("[id:" << this->getId()
222 << ",endpoint:" << m_socket->local_endpoint()
223 << "] Send or receive operation failed, closing socket: "
224 << error.category().message(error.value()));
225 }
226
227 closeSocket();
228
229 if (error == boost::asio::error::eof)
230 {
231 this->onFail("Connection closed");
232 }
233 else
234 {
235 this->onFail("Send or receive operation failed, closing socket: " +
236 error.category().message(error.value()));
237 }
238}
239
240
241template<class T, class U>
242inline void
243StreamFace<T, U>::handleSend(const boost::system::error_code& error,
244 const Block& wire)
245{
246 if (error)
247 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800248
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800249 NFD_LOG_TRACE("[id:" << this->getId()
250 << ",endpoint:" << m_socket->local_endpoint()
251 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800252}
253
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800254template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800255inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800256StreamFace<T, U>::handleSend(const boost::system::error_code& error,
257 const Block& header, const Block& payload)
258{
259 if (error)
260 return processErrorCode(error);
261
262 NFD_LOG_TRACE("[id:" << this->getId()
263 << ",endpoint:" << m_socket->local_endpoint()
264 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
265}
266
267template<class T, class U>
268inline void
269StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800270 std::size_t bytes_recvd)
271{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800272 if (error)
273 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800274
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800275 NFD_LOG_TRACE("[id:" << this->getId()
276 << ",endpoint:" << m_socket->local_endpoint()
277 << "] Received: " << bytes_recvd << " bytes");
278
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800279 m_inputBufferSize += bytes_recvd;
280 // do magic
281
282 std::size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800283
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700284 bool isOk = true;
285 Block element;
286 while(m_inputBufferSize - offset > 0)
287 {
288 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
289 if (!isOk)
290 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800291
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700292 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100293
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700294 BOOST_ASSERT(offset <= m_inputBufferSize);
295
296 if (!this->decodeAndDispatchInput(element))
297 {
298 NFD_LOG_WARN("[id:" << this->getId()
299 << ",endpoint:" << m_socket->local_endpoint()
300 << "] Received unrecognized block of type ["
301 << element.type() << "]");
302 // ignore unknown packet and proceed
303 }
304 }
305 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
306 {
307 NFD_LOG_WARN("[id:" << this->getId()
308 << ",endpoint:" << m_socket->local_endpoint()
309 << "] Failed to parse incoming packet or it is too large to process, "
310 << "closing down the face");
311
312 closeSocket();
313 this->onFail("Failed to parse incoming packet or it is too large to process, "
314 "closing down the face");
315 return;
316 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800317
318 if (offset > 0)
319 {
320 if (offset != m_inputBufferSize)
321 {
322 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
323 m_inputBuffer);
324 m_inputBufferSize -= offset;
325 }
326 else
327 {
328 m_inputBufferSize = 0;
329 }
330 }
331
332 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
333 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800334 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800335}
336
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800337template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800338inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800339StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800340{
341}
342
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800343template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100344inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800345StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100346{
347 boost::asio::io_service& io = m_socket->get_io_service();
348
349 // use the non-throwing variants and ignore errors, if any
350 boost::system::error_code error;
351 m_socket->shutdown(protocol::socket::shutdown_both, error);
352 m_socket->close(error);
353 // after this, handlers will be called with an error code
354
355 // ensure that the Face object is alive at least until all pending
356 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800357 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100358 this, this->shared_from_this()));
359}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800360
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800361} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800362
363#endif // NFD_FACE_STREAM_FACE_HPP