blob: 8a0f96082da723a256815b7eade399e583b3accd [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080012
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080013namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080014
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080015// forward declaration
16template<class T, class U, class V> struct StreamFaceSenderImpl;
17
18template<class Protocol, class FaceBase = Face>
19class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080020{
21public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080022 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080023
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080024 /**
25 * \brief Create instance of StreamFace
26 */
Davide Pesavento0ff10db2014-02-28 03:12:27 +010027 explicit
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000028 StreamFace(const FaceUri& uri,
29 const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080030
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080031 virtual
32 ~StreamFace();
33
34 // from Face
35 virtual void
36 sendInterest(const Interest& interest);
37
38 virtual void
39 sendData(const Data& data);
40
41 virtual void
42 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080043
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080044protected:
45 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080046 processErrorCode(const boost::system::error_code& error);
47
48 void
49 handleSend(const boost::system::error_code& error,
50 const Block& header, const Block& payload);
51 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080052 handleSend(const boost::system::error_code& error,
53 const Block& wire);
54
55 void
56 handleReceive(const boost::system::error_code& error,
57 std::size_t bytes_recvd);
58
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080059 void
60 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010061
62 void
63 closeSocket();
64
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080065protected:
66 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010067
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080068private:
69 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
70 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080071
72 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
73 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
74
Alexander Afanasyev3958b012014-01-31 15:06:13 -080075 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080076};
77
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080078// All inherited classes must use
79// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080080
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080081
82/** \brief Class allowing validation of the StreamFace use
83 *
84 * For example, partial specialization based on boost::asio::ip::tcp should check
85 * that local endpoint is loopback
86 *
87 * @throws Face::Error if validation failed
88 */
89template<class Protocol, class U>
90struct StreamFaceValidator
91{
92 static void
93 validateSocket(typename Protocol::socket& socket)
94 {
95 }
96};
97
98
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +000099template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800100inline
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000101StreamFace<T, FaceBase>::StreamFace(const FaceUri& uri,
102 const shared_ptr<typename StreamFace::protocol::socket>& socket)
103 : FaceBase(uri)
104 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800105 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800106{
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000107 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800108 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000109 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800110}
111
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800112template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800113inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800114StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800115{
116}
117
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800118template<class Protocol, class FaceBase, class Packet>
119struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800120{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800121 static void
122 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
123 {
124 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
125 packet.wireEncode().size()),
126 bind(&StreamFace<Protocol, FaceBase>::handleSend,
127 &face, _1, packet.wireEncode()));
128 }
129};
130
131// partial specialization (only classes can be partially specialized)
132template<class Protocol, class Packet>
133struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
134{
135 static void
136 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
137 {
138 using namespace boost::asio;
139
140 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
141 {
142 const Block& payload = packet.wireEncode();
143 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
144 bind(&StreamFace<Protocol, LocalFace>::handleSend,
145 &face, _1, packet.wireEncode()));
146 }
147 else
148 {
149 Block header = face.filterAndEncodeLocalControlHeader(packet);
150 const Block& payload = packet.wireEncode();
151
152 std::vector<const_buffer> buffers;
153 buffers.reserve(2);
154 buffers.push_back(buffer(header.wire(), header.size()));
155 buffers.push_back(buffer(payload.wire(), payload.size()));
156
157 face.m_socket->async_send(buffers,
158 bind(&StreamFace<Protocol, LocalFace>::handleSend,
159 &face, _1, header, payload));
160 }
161 }
162};
163
164
165template<class T, class U>
166inline void
167StreamFace<T, U>::sendInterest(const Interest& interest)
168{
169 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800170}
171
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800172template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800173inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800174StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800175{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800176 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800177}
178
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800179template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800180inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182{
183 if (!m_socket->is_open())
184 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800185
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800186 NFD_LOG_INFO("[id:" << this->getId()
187 << ",endpoint:" << m_socket->local_endpoint()
188 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800189
Davide Pesaventoba558e72014-02-17 18:38:19 +0100190 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800191 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800192}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800193
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800194template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800195inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800197{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800198 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800199 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800200
201 if (!m_socket->is_open())
202 {
203 this->onFail("Connection closed");
204 return;
205 }
206
207 if (error == boost::asio::error::eof)
208 {
209 NFD_LOG_INFO("[id:" << this->getId()
210 << ",endpoint:" << m_socket->local_endpoint()
211 << "] Connection closed");
212 }
213 else
214 {
215 NFD_LOG_WARN("[id:" << this->getId()
216 << ",endpoint:" << m_socket->local_endpoint()
217 << "] Send or receive operation failed, closing socket: "
218 << error.category().message(error.value()));
219 }
220
221 closeSocket();
222
223 if (error == boost::asio::error::eof)
224 {
225 this->onFail("Connection closed");
226 }
227 else
228 {
229 this->onFail("Send or receive operation failed, closing socket: " +
230 error.category().message(error.value()));
231 }
232}
233
234
235template<class T, class U>
236inline void
237StreamFace<T, U>::handleSend(const boost::system::error_code& error,
238 const Block& wire)
239{
240 if (error)
241 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800242
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800243 NFD_LOG_TRACE("[id:" << this->getId()
244 << ",endpoint:" << m_socket->local_endpoint()
245 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800246}
247
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800248template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800249inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800250StreamFace<T, U>::handleSend(const boost::system::error_code& error,
251 const Block& header, const Block& payload)
252{
253 if (error)
254 return processErrorCode(error);
255
256 NFD_LOG_TRACE("[id:" << this->getId()
257 << ",endpoint:" << m_socket->local_endpoint()
258 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
259}
260
261template<class T, class U>
262inline void
263StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800264 std::size_t bytes_recvd)
265{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800266 if (error)
267 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800268
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800269 NFD_LOG_TRACE("[id:" << this->getId()
270 << ",endpoint:" << m_socket->local_endpoint()
271 << "] Received: " << bytes_recvd << " bytes");
272
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800273 m_inputBufferSize += bytes_recvd;
274 // do magic
275
276 std::size_t offset = 0;
277 /// @todo Eliminate reliance on exceptions in this path
278 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800279 while(m_inputBufferSize - offset > 0)
280 {
281 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
282 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800283
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800284 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800285
286 if (!this->decodeAndDispatchInput(element))
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800287 {
288 NFD_LOG_WARN("[id:" << this->getId()
289 << ",endpoint:" << m_socket->local_endpoint()
290 << "] Received unrecognized block of type ["
291 << element.type() << "]");
292 // ignore unknown packet and proceed
293 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800294 }
295 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800296 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800297 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
298 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800299 NFD_LOG_WARN("[id:" << this->getId()
300 << ",endpoint:" << m_socket->local_endpoint()
301 << "] Received input is invalid or too large to process, "
302 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100303
304 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800305 this->onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800306 return;
307 }
308 }
309
310 if (offset > 0)
311 {
312 if (offset != m_inputBufferSize)
313 {
314 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
315 m_inputBuffer);
316 m_inputBufferSize -= offset;
317 }
318 else
319 {
320 m_inputBufferSize = 0;
321 }
322 }
323
324 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
325 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800326 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800327}
328
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800329template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800330inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800331StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800332{
333}
334
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800335template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100336inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800337StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100338{
339 boost::asio::io_service& io = m_socket->get_io_service();
340
341 // use the non-throwing variants and ignore errors, if any
342 boost::system::error_code error;
343 m_socket->shutdown(protocol::socket::shutdown_both, error);
344 m_socket->close(error);
345 // after this, handlers will be called with an error code
346
347 // ensure that the Face object is alive at least until all pending
348 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800349 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100350 this, this->shared_from_this()));
351}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800352
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800353} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800354
355#endif // NFD_FACE_STREAM_FACE_HPP