blob: a659373cca499b451fd7b858d6d31bb6bfcdb1b0 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
11
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080012namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080013
14template <class T>
15class StreamFace : public Face
16{
17public:
18 typedef T protocol;
19
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080020 /**
21 * \brief Create instance of StreamFace
22 */
Junxiao Shi8c8d2182014-01-30 22:33:00 -070023 StreamFace(const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080024
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080025 virtual
26 ~StreamFace();
27
28 // from Face
29 virtual void
30 sendInterest(const Interest& interest);
31
32 virtual void
33 sendData(const Data& data);
34
35 virtual void
36 close();
37
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080038protected:
39 void
40 handleSend(const boost::system::error_code& error,
41 const Block& wire);
42
43 void
44 handleReceive(const boost::system::error_code& error,
45 std::size_t bytes_recvd);
46
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080047 void
48 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
49
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080050protected:
51 shared_ptr<typename protocol::socket> m_socket;
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080052
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080053private:
54 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
55 std::size_t m_inputBufferSize;
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080056
Alexander Afanasyev3958b012014-01-31 15:06:13 -080057 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080058};
59
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080060// All inherited classes must use
61// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080062
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080063template <class T>
64inline
Junxiao Shi8c8d2182014-01-30 22:33:00 -070065StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
66 : m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -080067 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080068{
69 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
70 bind(&StreamFace<T>::handleReceive, this, _1, _2));
71}
72
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080073template <class T>
74inline
75StreamFace<T>::~StreamFace()
76{
77}
78
79
80template <class T>
81inline void
82StreamFace<T>::sendInterest(const Interest& interest)
83{
84 m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
85 interest.wireEncode().size()),
86 bind(&StreamFace<T>::handleSend, this, _1, interest.wireEncode()));
87
88 // anything else should be done here?
89}
90
91template <class T>
92inline void
93StreamFace<T>::sendData(const Data& data)
94{
95 m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
96 data.wireEncode().size()),
97 bind(&StreamFace<T>::handleSend, this, _1, data.wireEncode()));
98
99 // anything else should be done here?
100}
101
102template <class T>
103inline void
104StreamFace<T>::close()
105{
106 if (!m_socket->is_open())
107 return;
108
109 NFD_LOG_INFO("[id:" << this->getId()
110 << ",endpoint:" << m_socket->local_endpoint()
111 << "] Close connection");
112
113
114 boost::asio::io_service& io = m_socket->get_io_service();
115 m_socket->close();
116 // after this, handleSend/handleReceive will be called with set error code
117
118 // ensure that if that Face object is alive at least until all pending
119 // methods are dispatched
120 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
121 this, this->shared_from_this()));
122
123 onFail("Close connection");
124}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800125
126template <class T>
127inline void
128StreamFace<T>::handleSend(const boost::system::error_code& error,
129 const Block& wire)
130{
131 if (error) {
132 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
133 return;
134
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800135 if (!m_socket->is_open())
136 {
137 onFail("Connection closed");
138 return;
139 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800140
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800141 if (error == boost::asio::error::eof)
142 {
143 NFD_LOG_INFO("[id:" << this->getId()
144 << ",endpoint:" << m_socket->local_endpoint()
145 << "] Connection closed");
146 }
147 else
148 {
149 NFD_LOG_WARN("[id:" << this->getId()
150 << ",endpoint:" << m_socket->local_endpoint()
151 << "] Send operation failed, closing socket: "
152 << error.category().message(error.value()));
153 }
154
155 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800156 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800157
158 // ensure that if that Face object is alive at least until all pending
159 // methods are dispatched
160 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
161 this, this->shared_from_this()));
162
163 if (error == boost::asio::error::eof)
164 {
165 onFail("Connection closed");
166 }
167 else
168 {
169 onFail("Send operation failed, closing socket: " +
170 error.category().message(error.value()));
171 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800172 return;
173 }
174
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800175 NFD_LOG_TRACE("[id:" << this->getId()
176 << ",endpoint:" << m_socket->local_endpoint()
177 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800178 // do nothing (needed to retain validity of wire memory block
179}
180
181template <class T>
182inline void
183StreamFace<T>::handleReceive(const boost::system::error_code& error,
184 std::size_t bytes_recvd)
185{
186 if (error || bytes_recvd == 0) {
187 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
188 return;
189
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800190 // this should be unnecessary, but just in case
191 if (!m_socket->is_open())
192 {
193 onFail("Connection closed");
194 return;
195 }
196
197 if (error == boost::asio::error::eof)
198 {
199 NFD_LOG_INFO("[id:" << this->getId()
200 << ",endpoint:" << m_socket->local_endpoint()
201 << "] Connection closed");
202 }
203 else
204 {
205 NFD_LOG_WARN("[id:" << this->getId()
206 << ",endpoint:" << m_socket->local_endpoint()
207 << "] Receive operation failed: "
208 << error.category().message(error.value()));
209 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800210
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800211 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800212 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800213
214 // ensure that if that Face object is alive at least until all pending
215 // methods are dispatched
216 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
217 this, this->shared_from_this()));
218
219 if (error == boost::asio::error::eof)
220 {
221 onFail("Connection closed");
222 }
223 else
224 {
225 onFail("Receive operation failed, closing socket: " +
226 error.category().message(error.value()));
227 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800228 return;
229 }
230
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800231 NFD_LOG_TRACE("[id:" << this->getId()
232 << ",endpoint:" << m_socket->local_endpoint()
233 << "] Received: " << bytes_recvd << " bytes");
234
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800235 m_inputBufferSize += bytes_recvd;
236 // do magic
237
238 std::size_t offset = 0;
239 /// @todo Eliminate reliance on exceptions in this path
240 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800241 while(m_inputBufferSize - offset > 0)
242 {
243 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
244 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800245
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800246 BOOST_ASSERT(offset <= m_inputBufferSize);
247
248 /// @todo Ensure lazy field decoding process
249 if (element.type() == tlv::Interest)
250 {
251 shared_ptr<Interest> i = make_shared<Interest>();
252 i->wireDecode(element);
253 onReceiveInterest(*i);
254 }
255 else if (element.type() == tlv::Data)
256 {
257 shared_ptr<Data> d = make_shared<Data>();
258 d->wireDecode(element);
259 onReceiveData(*d);
260 }
261 // @todo Add local header support
262 // else if (element.type() == tlv::LocalHeader)
263 // {
264 // shared_ptr<Interest> i = make_shared<Interest>();
265 // i->wireDecode(element);
266 // }
267 else
268 {
269 NFD_LOG_WARN("[id:" << this->getId()
270 << ",endpoint:" << m_socket->local_endpoint()
271 << "] Received unrecognized block of type ["
272 << element.type() << "]");
273 // ignore unknown packet and proceed
274 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800275 }
276 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800277 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800278 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
279 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800280 NFD_LOG_WARN("[id:" << this->getId()
281 << ",endpoint:" << m_socket->local_endpoint()
282 << "] Received input is invalid or too large to process, "
283 << "closing down the face");
284
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800285 boost::asio::io_service& io = m_socket->get_io_service();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800286 m_socket->close();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800287
288 // ensure that if that Face object is alive at least until all pending
289 // methods are dispatched
290 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
291 this, this->shared_from_this()));
292
293 onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800294 return;
295 }
296 }
297
298 if (offset > 0)
299 {
300 if (offset != m_inputBufferSize)
301 {
302 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
303 m_inputBuffer);
304 m_inputBufferSize -= offset;
305 }
306 else
307 {
308 m_inputBufferSize = 0;
309 }
310 }
311
312 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
313 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
314 bind(&StreamFace<T>::handleReceive, this, _1, _2));
315}
316
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800317template <class T>
318inline void
319StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
320{
321}
322
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800323
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800324} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800325
326#endif // NFD_FACE_STREAM_FACE_HPP