blob: 65ca82e3572885b023f244d0713a9bda8b435ec7 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
11
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080012namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080013
14template <class T>
15class StreamFace : public Face
16{
17public:
18 typedef T protocol;
19
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080020 /**
21 * \brief Create instance of StreamFace
22 */
Junxiao Shi8c8d2182014-01-30 22:33:00 -070023 StreamFace(const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080024
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080025 virtual
26 ~StreamFace();
27
28 // from Face
29 virtual void
30 sendInterest(const Interest& interest);
31
32 virtual void
33 sendData(const Data& data);
34
35 virtual void
36 close();
37
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080038protected:
39 void
40 handleSend(const boost::system::error_code& error,
41 const Block& wire);
42
43 void
44 handleReceive(const boost::system::error_code& error,
45 std::size_t bytes_recvd);
46
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080047 void
48 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
49
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050protected:
51 shared_ptr<typename protocol::socket> m_socket;
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080052
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080053private:
54 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
55 std::size_t m_inputBufferSize;
Alexander Afanasyev3958b012014-01-31 15:06:13 -080056
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080057#ifdef _DEBUG
58 typename protocol::endpoint m_localEndpoint;
59#endif
60
Alexander Afanasyev3958b012014-01-31 15:06:13 -080061 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080062};
63
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080064// All inherited classes must use
65// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080066
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080067template <class T>
68inline
Junxiao Shi8c8d2182014-01-30 22:33:00 -070069StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
70 : m_socket(socket)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080071{
72 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
73 bind(&StreamFace<T>::handleReceive, this, _1, _2));
74}
75
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080076template <class T>
77inline
78StreamFace<T>::~StreamFace()
79{
80}
81
82
83template <class T>
84inline void
85StreamFace<T>::sendInterest(const Interest& interest)
86{
87 m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
88 interest.wireEncode().size()),
89 bind(&StreamFace<T>::handleSend, this, _1, interest.wireEncode()));
90
91 // anything else should be done here?
92}
93
94template <class T>
95inline void
96StreamFace<T>::sendData(const Data& data)
97{
98 m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
99 data.wireEncode().size()),
100 bind(&StreamFace<T>::handleSend, this, _1, data.wireEncode()));
101
102 // anything else should be done here?
103}
104
105template <class T>
106inline void
107StreamFace<T>::close()
108{
109 if (!m_socket->is_open())
110 return;
111
112 NFD_LOG_INFO("[id:" << this->getId()
113 << ",endpoint:" << m_socket->local_endpoint()
114 << "] Close connection");
115
116
117 boost::asio::io_service& io = m_socket->get_io_service();
118 m_socket->close();
119 // after this, handleSend/handleReceive will be called with set error code
120
121 // ensure that if that Face object is alive at least until all pending
122 // methods are dispatched
123 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
124 this, this->shared_from_this()));
125
126 onFail("Close connection");
127}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800128
129template <class T>
130inline void
131StreamFace<T>::handleSend(const boost::system::error_code& error,
132 const Block& wire)
133{
134 if (error) {
135 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
136 return;
137
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800138 if (!m_socket->is_open())
139 {
140 onFail("Connection closed");
141 return;
142 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800143
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800144 if (error == boost::asio::error::eof)
145 {
146 NFD_LOG_INFO("[id:" << this->getId()
147 << ",endpoint:" << m_socket->local_endpoint()
148 << "] Connection closed");
149 }
150 else
151 {
152 NFD_LOG_WARN("[id:" << this->getId()
153 << ",endpoint:" << m_socket->local_endpoint()
154 << "] Send operation failed, closing socket: "
155 << error.category().message(error.value()));
156 }
157
158 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800159 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800160
161 // ensure that if that Face object is alive at least until all pending
162 // methods are dispatched
163 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
164 this, this->shared_from_this()));
165
166 if (error == boost::asio::error::eof)
167 {
168 onFail("Connection closed");
169 }
170 else
171 {
172 onFail("Send operation failed, closing socket: " +
173 error.category().message(error.value()));
174 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800175 return;
176 }
177
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800178 NFD_LOG_TRACE("[id:" << this->getId()
179 << ",endpoint:" << m_socket->local_endpoint()
180 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800181 // do nothing (needed to retain validity of wire memory block
182}
183
184template <class T>
185inline void
186StreamFace<T>::handleReceive(const boost::system::error_code& error,
187 std::size_t bytes_recvd)
188{
189 if (error || bytes_recvd == 0) {
190 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
191 return;
192
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800193 // this should be unnecessary, but just in case
194 if (!m_socket->is_open())
195 {
196 onFail("Connection closed");
197 return;
198 }
199
200 if (error == boost::asio::error::eof)
201 {
202 NFD_LOG_INFO("[id:" << this->getId()
203 << ",endpoint:" << m_socket->local_endpoint()
204 << "] Connection closed");
205 }
206 else
207 {
208 NFD_LOG_WARN("[id:" << this->getId()
209 << ",endpoint:" << m_socket->local_endpoint()
210 << "] Receive operation failed: "
211 << error.category().message(error.value()));
212 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800213
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800214 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800215 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800216
217 // ensure that if that Face object is alive at least until all pending
218 // methods are dispatched
219 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
220 this, this->shared_from_this()));
221
222 if (error == boost::asio::error::eof)
223 {
224 onFail("Connection closed");
225 }
226 else
227 {
228 onFail("Receive operation failed, closing socket: " +
229 error.category().message(error.value()));
230 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800231 return;
232 }
233
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800234 NFD_LOG_TRACE("[id:" << this->getId()
235 << ",endpoint:" << m_socket->local_endpoint()
236 << "] Received: " << bytes_recvd << " bytes");
237
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800238 m_inputBufferSize += bytes_recvd;
239 // do magic
240
241 std::size_t offset = 0;
242 /// @todo Eliminate reliance on exceptions in this path
243 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800244 while(m_inputBufferSize - offset > 0)
245 {
246 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
247 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800248
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800249 BOOST_ASSERT(offset <= m_inputBufferSize);
250
251 /// @todo Ensure lazy field decoding process
252 if (element.type() == tlv::Interest)
253 {
254 shared_ptr<Interest> i = make_shared<Interest>();
255 i->wireDecode(element);
256 onReceiveInterest(*i);
257 }
258 else if (element.type() == tlv::Data)
259 {
260 shared_ptr<Data> d = make_shared<Data>();
261 d->wireDecode(element);
262 onReceiveData(*d);
263 }
264 // @todo Add local header support
265 // else if (element.type() == tlv::LocalHeader)
266 // {
267 // shared_ptr<Interest> i = make_shared<Interest>();
268 // i->wireDecode(element);
269 // }
270 else
271 {
272 NFD_LOG_WARN("[id:" << this->getId()
273 << ",endpoint:" << m_socket->local_endpoint()
274 << "] Received unrecognized block of type ["
275 << element.type() << "]");
276 // ignore unknown packet and proceed
277 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800278 }
279 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800280 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800281 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
282 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800283 NFD_LOG_WARN("[id:" << this->getId()
284 << ",endpoint:" << m_socket->local_endpoint()
285 << "] Received input is invalid or too large to process, "
286 << "closing down the face");
287
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800288 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800289 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800290
291 // ensure that if that Face object is alive at least until all pending
292 // methods are dispatched
293 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
294 this, this->shared_from_this()));
295
296 onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800297 return;
298 }
299 }
300
301 if (offset > 0)
302 {
303 if (offset != m_inputBufferSize)
304 {
305 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
306 m_inputBuffer);
307 m_inputBufferSize -= offset;
308 }
309 else
310 {
311 m_inputBufferSize = 0;
312 }
313 }
314
315 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
316 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
317 bind(&StreamFace<T>::handleReceive, this, _1, _2));
318}
319
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800320template <class T>
321inline void
322StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
323{
324}
325
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800326
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800327} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800328
329#endif // NFD_FACE_STREAM_FACE_HPP