blob: a6236657a8cca2d6209cbabb2d7989353a18e375 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080012
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080013namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080014
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080015// forward declaration
16template<class T, class U, class V> struct StreamFaceSenderImpl;
17
18template<class Protocol, class FaceBase = Face>
19class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080020{
21public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080022 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080023
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080024 /**
25 * \brief Create instance of StreamFace
26 */
Davide Pesavento0ff10db2014-02-28 03:12:27 +010027 explicit
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000028 StreamFace(const FaceUri& uri,
29 const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080030
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080031 virtual
32 ~StreamFace();
33
34 // from Face
35 virtual void
36 sendInterest(const Interest& interest);
37
38 virtual void
39 sendData(const Data& data);
40
41 virtual void
42 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080043
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080044protected:
45 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080046 processErrorCode(const boost::system::error_code& error);
47
48 void
49 handleSend(const boost::system::error_code& error,
50 const Block& header, const Block& payload);
51 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080052 handleSend(const boost::system::error_code& error,
53 const Block& wire);
54
55 void
56 handleReceive(const boost::system::error_code& error,
57 std::size_t bytes_recvd);
58
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080059 void
60 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010061
62 void
63 closeSocket();
64
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080065protected:
66 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010067
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080068private:
69 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
70 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080071
72 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
73 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
74
Alexander Afanasyev3958b012014-01-31 15:06:13 -080075 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080076};
77
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080078// All inherited classes must use
79// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080080
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080081
82/** \brief Class allowing validation of the StreamFace use
83 *
84 * For example, partial specialization based on boost::asio::ip::tcp should check
85 * that local endpoint is loopback
86 *
87 * @throws Face::Error if validation failed
88 */
89template<class Protocol, class U>
90struct StreamFaceValidator
91{
92 static void
93 validateSocket(typename Protocol::socket& socket)
94 {
95 }
96};
97
98
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000099template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800100inline
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000101StreamFace<T, FaceBase>::StreamFace(const FaceUri& uri,
102 const shared_ptr<typename StreamFace::protocol::socket>& socket)
103 : FaceBase(uri)
104 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800105 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800106{
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000107 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800108 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000109 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800110}
111
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800112template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800113inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800114StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800115{
116}
117
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800118template<class Protocol, class FaceBase, class Packet>
119struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800120{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800121 static void
122 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
123 {
124 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
125 packet.wireEncode().size()),
126 bind(&StreamFace<Protocol, FaceBase>::handleSend,
127 &face, _1, packet.wireEncode()));
128 }
129};
130
131// partial specialization (only classes can be partially specialized)
132template<class Protocol, class Packet>
133struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
134{
135 static void
136 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
137 {
138 using namespace boost::asio;
139
140 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
141 {
142 const Block& payload = packet.wireEncode();
143 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
144 bind(&StreamFace<Protocol, LocalFace>::handleSend,
145 &face, _1, packet.wireEncode()));
146 }
147 else
148 {
149 Block header = face.filterAndEncodeLocalControlHeader(packet);
150 const Block& payload = packet.wireEncode();
151
152 std::vector<const_buffer> buffers;
153 buffers.reserve(2);
154 buffers.push_back(buffer(header.wire(), header.size()));
155 buffers.push_back(buffer(payload.wire(), payload.size()));
156
157 face.m_socket->async_send(buffers,
158 bind(&StreamFace<Protocol, LocalFace>::handleSend,
159 &face, _1, header, payload));
160 }
161 }
162};
163
164
165template<class T, class U>
166inline void
167StreamFace<T, U>::sendInterest(const Interest& interest)
168{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000169 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800170 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800171}
172
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800173template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800174inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800175StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800176{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000177 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800178 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800179}
180
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800183StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800184{
185 if (!m_socket->is_open())
186 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800187
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800188 NFD_LOG_INFO("[id:" << this->getId()
189 << ",endpoint:" << m_socket->local_endpoint()
190 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800191
Davide Pesaventoba558e72014-02-17 18:38:19 +0100192 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800193 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800194}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800195
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800197inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800198StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800199{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800200 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800201 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800202
203 if (!m_socket->is_open())
204 {
205 this->onFail("Connection closed");
206 return;
207 }
208
209 if (error == boost::asio::error::eof)
210 {
211 NFD_LOG_INFO("[id:" << this->getId()
212 << ",endpoint:" << m_socket->local_endpoint()
213 << "] Connection closed");
214 }
215 else
216 {
217 NFD_LOG_WARN("[id:" << this->getId()
218 << ",endpoint:" << m_socket->local_endpoint()
219 << "] Send or receive operation failed, closing socket: "
220 << error.category().message(error.value()));
221 }
222
223 closeSocket();
224
225 if (error == boost::asio::error::eof)
226 {
227 this->onFail("Connection closed");
228 }
229 else
230 {
231 this->onFail("Send or receive operation failed, closing socket: " +
232 error.category().message(error.value()));
233 }
234}
235
236
237template<class T, class U>
238inline void
239StreamFace<T, U>::handleSend(const boost::system::error_code& error,
240 const Block& wire)
241{
242 if (error)
243 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800244
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800245 NFD_LOG_TRACE("[id:" << this->getId()
246 << ",endpoint:" << m_socket->local_endpoint()
247 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800248}
249
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800250template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800251inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800252StreamFace<T, U>::handleSend(const boost::system::error_code& error,
253 const Block& header, const Block& payload)
254{
255 if (error)
256 return processErrorCode(error);
257
258 NFD_LOG_TRACE("[id:" << this->getId()
259 << ",endpoint:" << m_socket->local_endpoint()
260 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
261}
262
263template<class T, class U>
264inline void
265StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800266 std::size_t bytes_recvd)
267{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800268 if (error)
269 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800270
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800271 NFD_LOG_TRACE("[id:" << this->getId()
272 << ",endpoint:" << m_socket->local_endpoint()
273 << "] Received: " << bytes_recvd << " bytes");
274
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800275 m_inputBufferSize += bytes_recvd;
276 // do magic
277
278 std::size_t offset = 0;
279 /// @todo Eliminate reliance on exceptions in this path
280 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800281 while(m_inputBufferSize - offset > 0)
282 {
283 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
284 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800285
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800286 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800287
288 if (!this->decodeAndDispatchInput(element))
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800289 {
290 NFD_LOG_WARN("[id:" << this->getId()
291 << ",endpoint:" << m_socket->local_endpoint()
292 << "] Received unrecognized block of type ["
293 << element.type() << "]");
294 // ignore unknown packet and proceed
295 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800296 }
297 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800298 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800299 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
300 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800301 NFD_LOG_WARN("[id:" << this->getId()
302 << ",endpoint:" << m_socket->local_endpoint()
303 << "] Received input is invalid or too large to process, "
304 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100305
306 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800307 this->onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800308 return;
309 }
310 }
311
312 if (offset > 0)
313 {
314 if (offset != m_inputBufferSize)
315 {
316 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
317 m_inputBuffer);
318 m_inputBufferSize -= offset;
319 }
320 else
321 {
322 m_inputBufferSize = 0;
323 }
324 }
325
326 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
327 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800328 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800329}
330
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800331template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800332inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800333StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800334{
335}
336
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800337template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100338inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800339StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100340{
341 boost::asio::io_service& io = m_socket->get_io_service();
342
343 // use the non-throwing variants and ignore errors, if any
344 boost::system::error_code error;
345 m_socket->shutdown(protocol::socket::shutdown_both, error);
346 m_socket->close(error);
347 // after this, handlers will be called with an error code
348
349 // ensure that the Face object is alive at least until all pending
350 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800351 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100352 this, this->shared_from_this()));
353}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800354
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800355} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800356
357#endif // NFD_FACE_STREAM_FACE_HPP