blob: db3766029db80a3af8489ace2af910f980ddbed5 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080012
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080013namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080014
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080015// forward declaration
16template<class T, class U, class V> struct StreamFaceSenderImpl;
17
18template<class Protocol, class FaceBase = Face>
19class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080020{
21public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080022 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080023
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080024 /**
25 * \brief Create instance of StreamFace
26 */
Davide Pesavento0ff10db2014-02-28 03:12:27 +010027 explicit
Junxiao Shi8c8d2182014-01-30 22:33:00 -070028 StreamFace(const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080029
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080030 virtual
31 ~StreamFace();
32
33 // from Face
34 virtual void
35 sendInterest(const Interest& interest);
36
37 virtual void
38 sendData(const Data& data);
39
40 virtual void
41 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080042
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080043protected:
44 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080045 processErrorCode(const boost::system::error_code& error);
46
47 void
48 handleSend(const boost::system::error_code& error,
49 const Block& header, const Block& payload);
50 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080051 handleSend(const boost::system::error_code& error,
52 const Block& wire);
53
54 void
55 handleReceive(const boost::system::error_code& error,
56 std::size_t bytes_recvd);
57
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080058 void
59 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010060
61 void
62 closeSocket();
63
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080064protected:
65 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010066
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080067private:
68 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
69 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080070
71 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
72 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
73
Alexander Afanasyev3958b012014-01-31 15:06:13 -080074 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080075};
76
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080077// All inherited classes must use
78// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080079
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080080
81/** \brief Class allowing validation of the StreamFace use
82 *
83 * For example, partial specialization based on boost::asio::ip::tcp should check
84 * that local endpoint is loopback
85 *
86 * @throws Face::Error if validation failed
87 */
88template<class Protocol, class U>
89struct StreamFaceValidator
90{
91 static void
92 validateSocket(typename Protocol::socket& socket)
93 {
94 }
95};
96
97
98template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080099inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800100StreamFace<T, U>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
Junxiao Shi8c8d2182014-01-30 22:33:00 -0700101 : m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800102 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800103{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800104 StreamFaceValidator<T, U>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800105 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800106 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800107}
108
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800109template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800110inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800111StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800112{
113}
114
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800115template<class Protocol, class FaceBase, class Packet>
116struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800117{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800118 static void
119 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
120 {
121 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
122 packet.wireEncode().size()),
123 bind(&StreamFace<Protocol, FaceBase>::handleSend,
124 &face, _1, packet.wireEncode()));
125 }
126};
127
128// partial specialization (only classes can be partially specialized)
129template<class Protocol, class Packet>
130struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
131{
132 static void
133 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
134 {
135 using namespace boost::asio;
136
137 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
138 {
139 const Block& payload = packet.wireEncode();
140 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
141 bind(&StreamFace<Protocol, LocalFace>::handleSend,
142 &face, _1, packet.wireEncode()));
143 }
144 else
145 {
146 Block header = face.filterAndEncodeLocalControlHeader(packet);
147 const Block& payload = packet.wireEncode();
148
149 std::vector<const_buffer> buffers;
150 buffers.reserve(2);
151 buffers.push_back(buffer(header.wire(), header.size()));
152 buffers.push_back(buffer(payload.wire(), payload.size()));
153
154 face.m_socket->async_send(buffers,
155 bind(&StreamFace<Protocol, LocalFace>::handleSend,
156 &face, _1, header, payload));
157 }
158 }
159};
160
161
162template<class T, class U>
163inline void
164StreamFace<T, U>::sendInterest(const Interest& interest)
165{
166 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800167}
168
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800169template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800170inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800171StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800172{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800173 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800174}
175
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800176template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800177inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800178StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800179{
180 if (!m_socket->is_open())
181 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800182
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800183 NFD_LOG_INFO("[id:" << this->getId()
184 << ",endpoint:" << m_socket->local_endpoint()
185 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800186
Davide Pesaventoba558e72014-02-17 18:38:19 +0100187 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800188 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800189}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800190
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800191template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800192inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800193StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800194{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800195 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800196 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800197
198 if (!m_socket->is_open())
199 {
200 this->onFail("Connection closed");
201 return;
202 }
203
204 if (error == boost::asio::error::eof)
205 {
206 NFD_LOG_INFO("[id:" << this->getId()
207 << ",endpoint:" << m_socket->local_endpoint()
208 << "] Connection closed");
209 }
210 else
211 {
212 NFD_LOG_WARN("[id:" << this->getId()
213 << ",endpoint:" << m_socket->local_endpoint()
214 << "] Send or receive operation failed, closing socket: "
215 << error.category().message(error.value()));
216 }
217
218 closeSocket();
219
220 if (error == boost::asio::error::eof)
221 {
222 this->onFail("Connection closed");
223 }
224 else
225 {
226 this->onFail("Send or receive operation failed, closing socket: " +
227 error.category().message(error.value()));
228 }
229}
230
231
232template<class T, class U>
233inline void
234StreamFace<T, U>::handleSend(const boost::system::error_code& error,
235 const Block& wire)
236{
237 if (error)
238 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800239
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800240 NFD_LOG_TRACE("[id:" << this->getId()
241 << ",endpoint:" << m_socket->local_endpoint()
242 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800243}
244
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800245template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800246inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800247StreamFace<T, U>::handleSend(const boost::system::error_code& error,
248 const Block& header, const Block& payload)
249{
250 if (error)
251 return processErrorCode(error);
252
253 NFD_LOG_TRACE("[id:" << this->getId()
254 << ",endpoint:" << m_socket->local_endpoint()
255 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
256}
257
258template<class T, class U>
259inline void
260StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800261 std::size_t bytes_recvd)
262{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800263 if (error)
264 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800265
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800266 NFD_LOG_TRACE("[id:" << this->getId()
267 << ",endpoint:" << m_socket->local_endpoint()
268 << "] Received: " << bytes_recvd << " bytes");
269
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800270 m_inputBufferSize += bytes_recvd;
271 // do magic
272
273 std::size_t offset = 0;
274 /// @todo Eliminate reliance on exceptions in this path
275 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800276 while(m_inputBufferSize - offset > 0)
277 {
278 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
279 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800280
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800281 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800282
283 if (!this->decodeAndDispatchInput(element))
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800284 {
285 NFD_LOG_WARN("[id:" << this->getId()
286 << ",endpoint:" << m_socket->local_endpoint()
287 << "] Received unrecognized block of type ["
288 << element.type() << "]");
289 // ignore unknown packet and proceed
290 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800291 }
292 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800293 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800294 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
295 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800296 NFD_LOG_WARN("[id:" << this->getId()
297 << ",endpoint:" << m_socket->local_endpoint()
298 << "] Received input is invalid or too large to process, "
299 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100300
301 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800302 this->onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800303 return;
304 }
305 }
306
307 if (offset > 0)
308 {
309 if (offset != m_inputBufferSize)
310 {
311 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
312 m_inputBuffer);
313 m_inputBufferSize -= offset;
314 }
315 else
316 {
317 m_inputBufferSize = 0;
318 }
319 }
320
321 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
322 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800323 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800324}
325
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800326template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800327inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800328StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800329{
330}
331
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800332template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100333inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800334StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100335{
336 boost::asio::io_service& io = m_socket->get_io_service();
337
338 // use the non-throwing variants and ignore errors, if any
339 boost::system::error_code error;
340 m_socket->shutdown(protocol::socket::shutdown_both, error);
341 m_socket->close(error);
342 // after this, handlers will be called with an error code
343
344 // ensure that the Face object is alive at least until all pending
345 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800346 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100347 this, this->shared_from_this()));
348}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800349
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800350} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800351
352#endif // NFD_FACE_STREAM_FACE_HPP