blob: 9baef1df0b6fab9cac020b0aa60be067cc63ce1f [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080012
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080013namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080014
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080015// forward declaration
16template<class T, class U, class V> struct StreamFaceSenderImpl;
17
18template<class Protocol, class FaceBase = Face>
19class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080020{
21public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080022 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080023
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080024 /**
25 * \brief Create instance of StreamFace
26 */
Junxiao Shi8c8d2182014-01-30 22:33:00 -070027 StreamFace(const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080028
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080029 virtual
30 ~StreamFace();
31
32 // from Face
33 virtual void
34 sendInterest(const Interest& interest);
35
36 virtual void
37 sendData(const Data& data);
38
39 virtual void
40 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080041
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080042protected:
43 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080044 processErrorCode(const boost::system::error_code& error);
45
46 void
47 handleSend(const boost::system::error_code& error,
48 const Block& header, const Block& payload);
49 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050 handleSend(const boost::system::error_code& error,
51 const Block& wire);
52
53 void
54 handleReceive(const boost::system::error_code& error,
55 std::size_t bytes_recvd);
56
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080057 void
58 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010059
60 void
61 closeSocket();
62
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080063protected:
64 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010065
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080066private:
67 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
68 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080069
70 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
71 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
72
Alexander Afanasyev3958b012014-01-31 15:06:13 -080073 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080074};
75
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080076// All inherited classes must use
77// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080078
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080079
80/** \brief Class allowing validation of the StreamFace use
81 *
82 * For example, partial specialization based on boost::asio::ip::tcp should check
83 * that local endpoint is loopback
84 *
85 * @throws Face::Error if validation failed
86 */
87template<class Protocol, class U>
88struct StreamFaceValidator
89{
90 static void
91 validateSocket(typename Protocol::socket& socket)
92 {
93 }
94};
95
96
97template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080098inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080099StreamFace<T, U>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
Junxiao Shi8c8d2182014-01-30 22:33:00 -0700100 : m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800101 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800102{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800103 StreamFaceValidator<T, U>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800104 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800105 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800106}
107
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800108template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800109inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800110StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800111{
112}
113
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800114template<class Protocol, class FaceBase, class Packet>
115struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800116{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800117 static void
118 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
119 {
120 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
121 packet.wireEncode().size()),
122 bind(&StreamFace<Protocol, FaceBase>::handleSend,
123 &face, _1, packet.wireEncode()));
124 }
125};
126
127// partial specialization (only classes can be partially specialized)
128template<class Protocol, class Packet>
129struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
130{
131 static void
132 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
133 {
134 using namespace boost::asio;
135
136 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
137 {
138 const Block& payload = packet.wireEncode();
139 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
140 bind(&StreamFace<Protocol, LocalFace>::handleSend,
141 &face, _1, packet.wireEncode()));
142 }
143 else
144 {
145 Block header = face.filterAndEncodeLocalControlHeader(packet);
146 const Block& payload = packet.wireEncode();
147
148 std::vector<const_buffer> buffers;
149 buffers.reserve(2);
150 buffers.push_back(buffer(header.wire(), header.size()));
151 buffers.push_back(buffer(payload.wire(), payload.size()));
152
153 face.m_socket->async_send(buffers,
154 bind(&StreamFace<Protocol, LocalFace>::handleSend,
155 &face, _1, header, payload));
156 }
157 }
158};
159
160
161template<class T, class U>
162inline void
163StreamFace<T, U>::sendInterest(const Interest& interest)
164{
165 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800166}
167
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800168template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800169inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800170StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800171{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800172 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800173}
174
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800175template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800176inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800177StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800178{
179 if (!m_socket->is_open())
180 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182 NFD_LOG_INFO("[id:" << this->getId()
183 << ",endpoint:" << m_socket->local_endpoint()
184 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800185
Davide Pesaventoba558e72014-02-17 18:38:19 +0100186 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800187 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800188}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800189
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800190template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800191inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800192StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800193{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800194 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800195 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196
197 if (!m_socket->is_open())
198 {
199 this->onFail("Connection closed");
200 return;
201 }
202
203 if (error == boost::asio::error::eof)
204 {
205 NFD_LOG_INFO("[id:" << this->getId()
206 << ",endpoint:" << m_socket->local_endpoint()
207 << "] Connection closed");
208 }
209 else
210 {
211 NFD_LOG_WARN("[id:" << this->getId()
212 << ",endpoint:" << m_socket->local_endpoint()
213 << "] Send or receive operation failed, closing socket: "
214 << error.category().message(error.value()));
215 }
216
217 closeSocket();
218
219 if (error == boost::asio::error::eof)
220 {
221 this->onFail("Connection closed");
222 }
223 else
224 {
225 this->onFail("Send or receive operation failed, closing socket: " +
226 error.category().message(error.value()));
227 }
228}
229
230
231template<class T, class U>
232inline void
233StreamFace<T, U>::handleSend(const boost::system::error_code& error,
234 const Block& wire)
235{
236 if (error)
237 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800238
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800239 NFD_LOG_TRACE("[id:" << this->getId()
240 << ",endpoint:" << m_socket->local_endpoint()
241 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800242}
243
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800244template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800245inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800246StreamFace<T, U>::handleSend(const boost::system::error_code& error,
247 const Block& header, const Block& payload)
248{
249 if (error)
250 return processErrorCode(error);
251
252 NFD_LOG_TRACE("[id:" << this->getId()
253 << ",endpoint:" << m_socket->local_endpoint()
254 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
255}
256
257template<class T, class U>
258inline void
259StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800260 std::size_t bytes_recvd)
261{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800262 if (error)
263 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800264
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800265 NFD_LOG_TRACE("[id:" << this->getId()
266 << ",endpoint:" << m_socket->local_endpoint()
267 << "] Received: " << bytes_recvd << " bytes");
268
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800269 m_inputBufferSize += bytes_recvd;
270 // do magic
271
272 std::size_t offset = 0;
273 /// @todo Eliminate reliance on exceptions in this path
274 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800275 while(m_inputBufferSize - offset > 0)
276 {
277 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
278 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800279
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800280 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800281
282 if (!this->decodeAndDispatchInput(element))
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800283 {
284 NFD_LOG_WARN("[id:" << this->getId()
285 << ",endpoint:" << m_socket->local_endpoint()
286 << "] Received unrecognized block of type ["
287 << element.type() << "]");
288 // ignore unknown packet and proceed
289 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800290 }
291 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800292 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800293 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
294 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800295 NFD_LOG_WARN("[id:" << this->getId()
296 << ",endpoint:" << m_socket->local_endpoint()
297 << "] Received input is invalid or too large to process, "
298 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100299
300 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800301 this->onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800302 return;
303 }
304 }
305
306 if (offset > 0)
307 {
308 if (offset != m_inputBufferSize)
309 {
310 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
311 m_inputBuffer);
312 m_inputBufferSize -= offset;
313 }
314 else
315 {
316 m_inputBufferSize = 0;
317 }
318 }
319
320 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
321 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800322 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800323}
324
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800325template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800326inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800327StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800328{
329}
330
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800331template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100332inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800333StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100334{
335 boost::asio::io_service& io = m_socket->get_io_service();
336
337 // use the non-throwing variants and ignore errors, if any
338 boost::system::error_code error;
339 m_socket->shutdown(protocol::socket::shutdown_both, error);
340 m_socket->close(error);
341 // after this, handlers will be called with an error code
342
343 // ensure that the Face object is alive at least until all pending
344 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800345 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100346 this, this->shared_from_this()));
347}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800348
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800349} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800350
351#endif // NFD_FACE_STREAM_FACE_HPP