blob: f60e87a989948b755ccd32e66035c71bbbf68514 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080011#include "local-face.hpp"
Steve DiBenedettobf6a93d2014-03-21 14:03:02 -060012#include "core/logger.hpp"
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080013
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080014namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080015
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080016// forward declaration
17template<class T, class U, class V> struct StreamFaceSenderImpl;
18
19template<class Protocol, class FaceBase = Face>
20class StreamFace : public FaceBase
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080021{
22public:
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080023 typedef Protocol protocol;
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080024
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080025 /**
26 * \brief Create instance of StreamFace
27 */
Junxiao Shi79494162014-04-02 18:25:11 -070028 StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
Alexander Afanasyev355c0662014-03-20 18:08:17 -070029 const shared_ptr<typename protocol::socket>& socket,
30 bool isOnDemand);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080031
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080032 virtual
33 ~StreamFace();
34
35 // from Face
36 virtual void
37 sendInterest(const Interest& interest);
38
39 virtual void
40 sendData(const Data& data);
41
42 virtual void
43 close();
Alexander Afanasyev93ce75e2014-02-18 19:45:34 -080044
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080045protected:
46 void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080047 processErrorCode(const boost::system::error_code& error);
48
49 void
50 handleSend(const boost::system::error_code& error,
51 const Block& header, const Block& payload);
52 void
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080053 handleSend(const boost::system::error_code& error,
54 const Block& wire);
55
56 void
57 handleReceive(const boost::system::error_code& error,
58 std::size_t bytes_recvd);
59
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080060 void
61 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010062
63 void
64 closeSocket();
65
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080066protected:
67 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010068
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080069private:
70 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
71 std::size_t m_inputBufferSize;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080072
73 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
74 friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
75
Alexander Afanasyev3958b012014-01-31 15:06:13 -080076 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080077};
78
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080079// All inherited classes must use
80// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080081
Alexander Afanasyevbd220a02014-02-20 00:29:56 -080082
83/** \brief Class allowing validation of the StreamFace use
84 *
85 * For example, partial specialization based on boost::asio::ip::tcp should check
86 * that local endpoint is loopback
87 *
88 * @throws Face::Error if validation failed
89 */
90template<class Protocol, class U>
91struct StreamFaceValidator
92{
93 static void
94 validateSocket(typename Protocol::socket& socket)
95 {
96 }
97};
98
99
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000100template<class T, class FaceBase>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800101inline
Junxiao Shi79494162014-04-02 18:25:11 -0700102StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
103 const shared_ptr<typename StreamFace::protocol::socket>& socket,
104 bool isOnDemand)
105 : FaceBase(remoteUri, localUri)
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000106 , m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -0800107 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800108{
Alexander Afanasyev355c0662014-03-20 18:08:17 -0700109 FaceBase::setOnDemand(isOnDemand);
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000110 StreamFaceValidator<T, FaceBase>::validateSocket(*socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800111 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
Alexander Afanasyeva39b90b2014-03-05 15:31:00 +0000112 bind(&StreamFace<T, FaceBase>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800113}
114
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800115template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800116inline
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800117StreamFace<T, U>::~StreamFace()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800118{
119}
120
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800121template<class Protocol, class FaceBase, class Packet>
122struct StreamFaceSenderImpl
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800123{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800124 static void
125 send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
126 {
127 face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
128 packet.wireEncode().size()),
129 bind(&StreamFace<Protocol, FaceBase>::handleSend,
130 &face, _1, packet.wireEncode()));
131 }
132};
133
134// partial specialization (only classes can be partially specialized)
135template<class Protocol, class Packet>
136struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
137{
138 static void
139 send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
140 {
141 using namespace boost::asio;
142
143 if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
144 {
145 const Block& payload = packet.wireEncode();
146 face.m_socket->async_send(buffer(payload.wire(), payload.size()),
147 bind(&StreamFace<Protocol, LocalFace>::handleSend,
148 &face, _1, packet.wireEncode()));
149 }
150 else
151 {
152 Block header = face.filterAndEncodeLocalControlHeader(packet);
153 const Block& payload = packet.wireEncode();
154
155 std::vector<const_buffer> buffers;
156 buffers.reserve(2);
157 buffers.push_back(buffer(header.wire(), header.size()));
158 buffers.push_back(buffer(payload.wire(), payload.size()));
159
160 face.m_socket->async_send(buffers,
161 bind(&StreamFace<Protocol, LocalFace>::handleSend,
162 &face, _1, header, payload));
163 }
164 }
165};
166
167
168template<class T, class U>
169inline void
170StreamFace<T, U>::sendInterest(const Interest& interest)
171{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000172 this->onSendInterest(interest);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800173 StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800174}
175
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800176template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800177inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800178StreamFace<T, U>::sendData(const Data& data)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800179{
Alexander Afanasyev7e698e62014-03-07 16:48:35 +0000180 this->onSendData(data);
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800181 StreamFaceSenderImpl<T, U, Data>::send(*this, data);
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800182}
183
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800184template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800185inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800186StreamFace<T, U>::close()
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800187{
188 if (!m_socket->is_open())
189 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800190
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800191 NFD_LOG_INFO("[id:" << this->getId()
192 << ",endpoint:" << m_socket->local_endpoint()
193 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800194
Davide Pesaventoba558e72014-02-17 18:38:19 +0100195 closeSocket();
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800196 this->onFail("Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800197}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800198
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800199template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800200inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800201StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800202{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800203 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800204 return;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800205
206 if (!m_socket->is_open())
207 {
208 this->onFail("Connection closed");
209 return;
210 }
211
212 if (error == boost::asio::error::eof)
213 {
214 NFD_LOG_INFO("[id:" << this->getId()
215 << ",endpoint:" << m_socket->local_endpoint()
216 << "] Connection closed");
217 }
218 else
219 {
220 NFD_LOG_WARN("[id:" << this->getId()
221 << ",endpoint:" << m_socket->local_endpoint()
222 << "] Send or receive operation failed, closing socket: "
223 << error.category().message(error.value()));
224 }
225
226 closeSocket();
227
228 if (error == boost::asio::error::eof)
229 {
230 this->onFail("Connection closed");
231 }
232 else
233 {
234 this->onFail("Send or receive operation failed, closing socket: " +
235 error.category().message(error.value()));
236 }
237}
238
239
240template<class T, class U>
241inline void
242StreamFace<T, U>::handleSend(const boost::system::error_code& error,
243 const Block& wire)
244{
245 if (error)
246 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800247
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800248 NFD_LOG_TRACE("[id:" << this->getId()
249 << ",endpoint:" << m_socket->local_endpoint()
250 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800251}
252
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800253template<class T, class U>
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800254inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800255StreamFace<T, U>::handleSend(const boost::system::error_code& error,
256 const Block& header, const Block& payload)
257{
258 if (error)
259 return processErrorCode(error);
260
261 NFD_LOG_TRACE("[id:" << this->getId()
262 << ",endpoint:" << m_socket->local_endpoint()
263 << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
264}
265
266template<class T, class U>
267inline void
268StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800269 std::size_t bytes_recvd)
270{
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800271 if (error)
272 return processErrorCode(error);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800273
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800274 NFD_LOG_TRACE("[id:" << this->getId()
275 << ",endpoint:" << m_socket->local_endpoint()
276 << "] Received: " << bytes_recvd << " bytes");
277
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800278 m_inputBufferSize += bytes_recvd;
279 // do magic
280
281 std::size_t offset = 0;
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800282
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700283 bool isOk = true;
284 Block element;
285 while(m_inputBufferSize - offset > 0)
286 {
287 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
288 if (!isOk)
289 break;
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800290
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700291 offset += element.size();
Davide Pesaventoba558e72014-02-17 18:38:19 +0100292
Alexander Afanasyev5a8d8d82014-03-21 14:08:41 -0700293 BOOST_ASSERT(offset <= m_inputBufferSize);
294
295 if (!this->decodeAndDispatchInput(element))
296 {
297 NFD_LOG_WARN("[id:" << this->getId()
298 << ",endpoint:" << m_socket->local_endpoint()
299 << "] Received unrecognized block of type ["
300 << element.type() << "]");
301 // ignore unknown packet and proceed
302 }
303 }
304 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
305 {
306 NFD_LOG_WARN("[id:" << this->getId()
307 << ",endpoint:" << m_socket->local_endpoint()
308 << "] Failed to parse incoming packet or it is too large to process, "
309 << "closing down the face");
310
311 closeSocket();
312 this->onFail("Failed to parse incoming packet or it is too large to process, "
313 "closing down the face");
314 return;
315 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800316
317 if (offset > 0)
318 {
319 if (offset != m_inputBufferSize)
320 {
321 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
322 m_inputBuffer);
323 m_inputBufferSize -= offset;
324 }
325 else
326 {
327 m_inputBufferSize = 0;
328 }
329 }
330
331 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
332 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800333 bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800334}
335
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800336template<class T, class U>
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800337inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800338StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800339{
340}
341
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800342template<class T, class U>
Davide Pesaventoba558e72014-02-17 18:38:19 +0100343inline void
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800344StreamFace<T, U>::closeSocket()
Davide Pesaventoba558e72014-02-17 18:38:19 +0100345{
346 boost::asio::io_service& io = m_socket->get_io_service();
347
348 // use the non-throwing variants and ignore errors, if any
349 boost::system::error_code error;
350 m_socket->shutdown(protocol::socket::shutdown_both, error);
351 m_socket->close(error);
352 // after this, handlers will be called with an error code
353
354 // ensure that the Face object is alive at least until all pending
355 // handlers are dispatched
Alexander Afanasyevbd220a02014-02-20 00:29:56 -0800356 io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
Davide Pesaventoba558e72014-02-17 18:38:19 +0100357 this, this->shared_from_this()));
358}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800359
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800360} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800361
362#endif // NFD_FACE_STREAM_FACE_HPP