blob: f6c008ac3ddf1d64f65b447f4ce632813cff82f5 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080012
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080013namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080014
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080015// forward declaration
16template<class T, class U, class V> struct StreamFaceSenderImpl;
17
18template<class Protocol, class FaceBase = Face>
19class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080020{
21public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080022 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080023
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080024 /**
25 * \brief Create instance of StreamFace
26 */
Davide Pesavento0ff10db2014-02-28 03:12:27 +010027 explicit
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000028 StreamFace(const FaceUri& uri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070029 const shared_ptr<typename protocol::socket>& socket,
30 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080031
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080032 virtual
33 ~StreamFace();
34
35 // from Face
36 virtual void
37 sendInterest(const Interest& interest);
38
39 virtual void
40 sendData(const Data& data);
41
42 virtual void
43 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080044
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080045protected:
46 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080047 processErrorCode(const boost::system::error_code& error);
48
49 void
50 handleSend(const boost::system::error_code& error,
51 const Block& header, const Block& payload);
52 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080053 handleSend(const boost::system::error_code& error,
54 const Block& wire);
55
56 void
57 handleReceive(const boost::system::error_code& error,
58 std::size_t bytes_recvd);
59
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080060 void
61 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010062
63 void
64 closeSocket();
65
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080066protected:
67 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010068
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080069private:
70 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
71 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080072
73 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
74 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
75
Alexander Afanasyev3958b012014-01-31 15:06:13 -080076 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080077};
78
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080079// All inherited classes must use
80// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080081
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080082
83/** \brief Class allowing validation of the StreamFace use
84 *
85 * For example, partial specialization based on boost::asio::ip::tcp should check
86 * that local endpoint is loopback
87 *
88 * @throws Face::Error if validation failed
89 */
90template<class Protocol, class U>
91struct StreamFaceValidator
92{
93 static void
94 validateSocket(typename Protocol::socket& socket)
95 {
96 }
97};
98
99
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000100template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800101inline
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000102StreamFace<T, FaceBase>::StreamFace(const FaceUri& uri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700103 const shared_ptr<typename StreamFace::protocol::socket>& socket,
104 bool isOnDemand)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000105 : FaceBase(uri)
106 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800107 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800108{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700109 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000110 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800111 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000112 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800113}
114
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800115template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800116inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800117StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800118{
119}
120
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800121template<class Protocol, class FaceBase, class Packet>
122struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800123{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800124 static void
125 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
126 {
127 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
128 packet.wireEncode().size()),
129 bind(&StreamFace<Protocol, FaceBase>::handleSend,
130 &face, _1, packet.wireEncode()));
131 }
132};
133
134// partial specialization (only classes can be partially specialized)
135template<class Protocol, class Packet>
136struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
137{
138 static void
139 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
140 {
141 using namespace boost::asio;
142
143 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
144 {
145 const Block& payload = packet.wireEncode();
146 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
147 bind(&StreamFace<Protocol, LocalFace>::handleSend,
148 &face, _1, packet.wireEncode()));
149 }
150 else
151 {
152 Block header = face.filterAndEncodeLocalControlHeader(packet);
153 const Block& payload = packet.wireEncode();
154
155 std::vector<const_buffer> buffers;
156 buffers.reserve(2);
157 buffers.push_back(buffer(header.wire(), header.size()));
158 buffers.push_back(buffer(payload.wire(), payload.size()));
159
160 face.m_socket->async_send(buffers,
161 bind(&StreamFace<Protocol, LocalFace>::handleSend,
162 &face, _1, header, payload));
163 }
164 }
165};
166
167
168template<class T, class U>
169inline void
170StreamFace<T, U>::sendInterest(const Interest& interest)
171{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000172 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800173 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800174}
175
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800176template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800177inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800178StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800179{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000180 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182}
183
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800184template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800185inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800186StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800187{
188 if (!m_socket->is_open())
189 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800190
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800191 NFD_LOG_INFO("[id:" << this->getId()
192 << ",endpoint:" << m_socket->local_endpoint()
193 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800194
Davide Pesaventoba558e72014-02-17 18:38:19 +0100195 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800197}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800198
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800199template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800200inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800201StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800202{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800203 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800204 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800205
206 if (!m_socket->is_open())
207 {
208 this->onFail("Connection closed");
209 return;
210 }
211
212 if (error == boost::asio::error::eof)
213 {
214 NFD_LOG_INFO("[id:" << this->getId()
215 << ",endpoint:" << m_socket->local_endpoint()
216 << "] Connection closed");
217 }
218 else
219 {
220 NFD_LOG_WARN("[id:" << this->getId()
221 << ",endpoint:" << m_socket->local_endpoint()
222 << "] Send or receive operation failed, closing socket: "
223 << error.category().message(error.value()));
224 }
225
226 closeSocket();
227
228 if (error == boost::asio::error::eof)
229 {
230 this->onFail("Connection closed");
231 }
232 else
233 {
234 this->onFail("Send or receive operation failed, closing socket: " +
235 error.category().message(error.value()));
236 }
237}
238
239
240template<class T, class U>
241inline void
242StreamFace<T, U>::handleSend(const boost::system::error_code& error,
243 const Block& wire)
244{
245 if (error)
246 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800247
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800248 NFD_LOG_TRACE("[id:" << this->getId()
249 << ",endpoint:" << m_socket->local_endpoint()
250 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800251}
252
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800253template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800254inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800255StreamFace<T, U>::handleSend(const boost::system::error_code& error,
256 const Block& header, const Block& payload)
257{
258 if (error)
259 return processErrorCode(error);
260
261 NFD_LOG_TRACE("[id:" << this->getId()
262 << ",endpoint:" << m_socket->local_endpoint()
263 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
264}
265
266template<class T, class U>
267inline void
268StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800269 std::size_t bytes_recvd)
270{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800271 if (error)
272 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800273
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800274 NFD_LOG_TRACE("[id:" << this->getId()
275 << ",endpoint:" << m_socket->local_endpoint()
276 << "] Received: " << bytes_recvd << " bytes");
277
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800278 m_inputBufferSize += bytes_recvd;
279 // do magic
280
281 std::size_t offset = 0;
282 /// @todo Eliminate reliance on exceptions in this path
283 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800284 while(m_inputBufferSize - offset > 0)
285 {
286 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
287 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800288
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800289 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800290
291 if (!this->decodeAndDispatchInput(element))
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800292 {
293 NFD_LOG_WARN("[id:" << this->getId()
294 << ",endpoint:" << m_socket->local_endpoint()
295 << "] Received unrecognized block of type ["
296 << element.type() << "]");
297 // ignore unknown packet and proceed
298 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800299 }
300 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800301 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800302 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
303 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800304 NFD_LOG_WARN("[id:" << this->getId()
305 << ",endpoint:" << m_socket->local_endpoint()
306 << "] Received input is invalid or too large to process, "
307 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100308
309 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800310 this->onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800311 return;
312 }
313 }
314
315 if (offset > 0)
316 {
317 if (offset != m_inputBufferSize)
318 {
319 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
320 m_inputBuffer);
321 m_inputBufferSize -= offset;
322 }
323 else
324 {
325 m_inputBufferSize = 0;
326 }
327 }
328
329 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
330 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800331 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800332}
333
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800334template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800335inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800336StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800337{
338}
339
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800340template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100341inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800342StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100343{
344 boost::asio::io_service& io = m_socket->get_io_service();
345
346 // use the non-throwing variants and ignore errors, if any
347 boost::system::error_code error;
348 m_socket->shutdown(protocol::socket::shutdown_both, error);
349 m_socket->close(error);
350 // after this, handlers will be called with an error code
351
352 // ensure that the Face object is alive at least until all pending
353 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800354 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100355 this, this->shared_from_this()));
356}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800357
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800358} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800359
360#endif // NFD_FACE_STREAM_FACE_HPP