blob: 95738131d2cfff92bd54af63adbc9221638f5790 [file] [log] [blame]
Alexander Afanasyeva9034b02014-01-26 18:32:02 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Named Data Networking Project
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NFD_FACE_STREAM_FACE_HPP
8#define NFD_FACE_STREAM_FACE_HPP
9
10#include "face.hpp"
11
Alexander Afanasyev18bbf812014-01-29 01:40:23 -080012namespace nfd {
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080013
14template <class T>
15class StreamFace : public Face
16{
17public:
18 typedef T protocol;
19
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080020 /**
21 * \brief Create instance of StreamFace
22 */
Junxiao Shi8c8d2182014-01-30 22:33:00 -070023 StreamFace(const shared_ptr<typename protocol::socket>& socket);
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080024
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080025 virtual
26 ~StreamFace();
27
28 // from Face
29 virtual void
30 sendInterest(const Interest& interest);
31
32 virtual void
33 sendData(const Data& data);
34
35 virtual void
36 close();
37
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080038protected:
39 void
40 handleSend(const boost::system::error_code& error,
41 const Block& wire);
42
43 void
44 handleReceive(const boost::system::error_code& error,
45 std::size_t bytes_recvd);
46
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080047 void
48 keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
Davide Pesaventoba558e72014-02-17 18:38:19 +010049
50 void
51 closeSocket();
52
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080053protected:
54 shared_ptr<typename protocol::socket> m_socket;
Davide Pesaventoba558e72014-02-17 18:38:19 +010055
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080056private:
57 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
58 std::size_t m_inputBufferSize;
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080059
Alexander Afanasyev3958b012014-01-31 15:06:13 -080060 NFD_LOG_INCLASS_DECLARE();
Alexander Afanasyeva9034b02014-01-26 18:32:02 -080061};
62
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080063// All inherited classes must use
64// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
Alexander Afanasyev3958b012014-01-31 15:06:13 -080065
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080066template <class T>
67inline
Junxiao Shi8c8d2182014-01-30 22:33:00 -070068StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
69 : m_socket(socket)
Alexander Afanasyevb9f6e432014-02-14 20:52:49 -080070 , m_inputBufferSize(0)
Alexander Afanasyevd32cb962014-01-28 12:43:47 -080071{
72 m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
73 bind(&StreamFace<T>::handleReceive, this, _1, _2));
74}
75
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -080076template <class T>
77inline
78StreamFace<T>::~StreamFace()
79{
80}
81
82
83template <class T>
84inline void
85StreamFace<T>::sendInterest(const Interest& interest)
86{
87 m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
88 interest.wireEncode().size()),
89 bind(&StreamFace<T>::handleSend, this, _1, interest.wireEncode()));
90
91 // anything else should be done here?
92}
93
94template <class T>
95inline void
96StreamFace<T>::sendData(const Data& data)
97{
98 m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
99 data.wireEncode().size()),
100 bind(&StreamFace<T>::handleSend, this, _1, data.wireEncode()));
101
102 // anything else should be done here?
103}
104
105template <class T>
106inline void
107StreamFace<T>::close()
108{
109 if (!m_socket->is_open())
110 return;
111
112 NFD_LOG_INFO("[id:" << this->getId()
113 << ",endpoint:" << m_socket->local_endpoint()
114 << "] Close connection");
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800115
Davide Pesaventoba558e72014-02-17 18:38:19 +0100116 closeSocket();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800117 onFail("Close connection");
118}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800119
120template <class T>
121inline void
122StreamFace<T>::handleSend(const boost::system::error_code& error,
123 const Block& wire)
124{
125 if (error) {
126 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
127 return;
128
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800129 if (!m_socket->is_open())
130 {
131 onFail("Connection closed");
132 return;
133 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800134
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800135 if (error == boost::asio::error::eof)
136 {
137 NFD_LOG_INFO("[id:" << this->getId()
138 << ",endpoint:" << m_socket->local_endpoint()
139 << "] Connection closed");
140 }
141 else
142 {
143 NFD_LOG_WARN("[id:" << this->getId()
144 << ",endpoint:" << m_socket->local_endpoint()
145 << "] Send operation failed, closing socket: "
146 << error.category().message(error.value()));
147 }
148
Davide Pesaventoba558e72014-02-17 18:38:19 +0100149 closeSocket();
150
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800151 if (error == boost::asio::error::eof)
152 {
153 onFail("Connection closed");
154 }
155 else
156 {
157 onFail("Send operation failed, closing socket: " +
158 error.category().message(error.value()));
159 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800160 return;
161 }
162
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800163 NFD_LOG_TRACE("[id:" << this->getId()
164 << ",endpoint:" << m_socket->local_endpoint()
165 << "] Successfully sent: " << wire.size() << " bytes");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800166 // do nothing (needed to retain validity of wire memory block
167}
168
169template <class T>
170inline void
171StreamFace<T>::handleReceive(const boost::system::error_code& error,
172 std::size_t bytes_recvd)
173{
174 if (error || bytes_recvd == 0) {
175 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
176 return;
177
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800178 // this should be unnecessary, but just in case
179 if (!m_socket->is_open())
180 {
181 onFail("Connection closed");
182 return;
183 }
184
185 if (error == boost::asio::error::eof)
186 {
187 NFD_LOG_INFO("[id:" << this->getId()
188 << ",endpoint:" << m_socket->local_endpoint()
189 << "] Connection closed");
190 }
191 else
192 {
193 NFD_LOG_WARN("[id:" << this->getId()
194 << ",endpoint:" << m_socket->local_endpoint()
195 << "] Receive operation failed: "
196 << error.category().message(error.value()));
197 }
Davide Pesaventoba558e72014-02-17 18:38:19 +0100198
199 closeSocket();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800200
201 if (error == boost::asio::error::eof)
202 {
203 onFail("Connection closed");
204 }
205 else
206 {
207 onFail("Receive operation failed, closing socket: " +
208 error.category().message(error.value()));
209 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800210 return;
211 }
212
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800213 NFD_LOG_TRACE("[id:" << this->getId()
214 << ",endpoint:" << m_socket->local_endpoint()
215 << "] Received: " << bytes_recvd << " bytes");
216
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800217 m_inputBufferSize += bytes_recvd;
218 // do magic
219
220 std::size_t offset = 0;
221 /// @todo Eliminate reliance on exceptions in this path
222 try {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800223 while(m_inputBufferSize - offset > 0)
224 {
225 Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
226 offset += element.size();
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800227
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800228 BOOST_ASSERT(offset <= m_inputBufferSize);
229
230 /// @todo Ensure lazy field decoding process
231 if (element.type() == tlv::Interest)
232 {
233 shared_ptr<Interest> i = make_shared<Interest>();
234 i->wireDecode(element);
235 onReceiveInterest(*i);
236 }
237 else if (element.type() == tlv::Data)
238 {
239 shared_ptr<Data> d = make_shared<Data>();
240 d->wireDecode(element);
241 onReceiveData(*d);
242 }
243 // @todo Add local header support
244 // else if (element.type() == tlv::LocalHeader)
245 // {
246 // shared_ptr<Interest> i = make_shared<Interest>();
247 // i->wireDecode(element);
248 // }
249 else
250 {
251 NFD_LOG_WARN("[id:" << this->getId()
252 << ",endpoint:" << m_socket->local_endpoint()
253 << "] Received unrecognized block of type ["
254 << element.type() << "]");
255 // ignore unknown packet and proceed
256 }
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800257 }
258 }
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800259 catch(const tlv::Error& e) {
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800260 if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
261 {
Alexander Afanasyev3958b012014-01-31 15:06:13 -0800262 NFD_LOG_WARN("[id:" << this->getId()
263 << ",endpoint:" << m_socket->local_endpoint()
264 << "] Received input is invalid or too large to process, "
265 << "closing down the face");
Davide Pesaventoba558e72014-02-17 18:38:19 +0100266
267 closeSocket();
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800268 onFail("Received input is invalid or too large to process, closing down the face");
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800269 return;
270 }
271 }
272
273 if (offset > 0)
274 {
275 if (offset != m_inputBufferSize)
276 {
277 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
278 m_inputBuffer);
279 m_inputBufferSize -= offset;
280 }
281 else
282 {
283 m_inputBufferSize = 0;
284 }
285 }
286
287 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
288 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
289 bind(&StreamFace<T>::handleReceive, this, _1, _2));
290}
291
Alexander Afanasyeva0a10fb2014-02-13 19:56:15 -0800292template <class T>
293inline void
294StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
295{
296}
297
Davide Pesaventoba558e72014-02-17 18:38:19 +0100298template <class T>
299inline void
300StreamFace<T>::closeSocket()
301{
302 boost::asio::io_service& io = m_socket->get_io_service();
303
304 // use the non-throwing variants and ignore errors, if any
305 boost::system::error_code error;
306 m_socket->shutdown(protocol::socket::shutdown_both, error);
307 m_socket->close(error);
308 // after this, handlers will be called with an error code
309
310 // ensure that the Face object is alive at least until all pending
311 // handlers are dispatched
312 io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
313 this, this->shared_from_this()));
314}
Alexander Afanasyevd32cb962014-01-28 12:43:47 -0800315
Alexander Afanasyev18bbf812014-01-29 01:40:23 -0800316} // namespace nfd
Alexander Afanasyeva9034b02014-01-26 18:32:02 -0800317
318#endif // NFD_FACE_STREAM_FACE_HPP