blob: 713d7405c465659187dedfa1d05b1327f6e2c510 [file] [log] [blame]
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013-2014 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
8#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
9
10#include "../common.hpp"
11
12namespace ndn {
13
14const size_t MAX_LENGTH = 9000;
15
16template<class BaseTransport, class Protocol>
17class StreamTransportImpl
18{
19public:
20 typedef BaseTransport base_transport;
21 typedef Protocol protocol;
22 typedef StreamTransportImpl<BaseTransport,Protocol> impl;
23
24 StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
25 : m_transport(transport)
26 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070027 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080028 , m_connectionInProgress(false)
29 , m_connectTimer(ioService)
30 {
31 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070032
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080033 void
34 connectHandler(const boost::system::error_code& error)
35 {
36 m_connectionInProgress = false;
37 m_connectTimer.cancel();
38
39 if (!error)
40 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000041 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080042 m_transport.m_isConnected = true;
43
44 for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
45 m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
46 bind(&impl::handle_async_send, this, _1, *i));
47
48 for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
49 i != m_sendPairQueue.end(); ++i)
50 {
51 std::vector<boost::asio::const_buffer> buffer;
52 buffer.reserve(2);
53 buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
54 buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
55 m_socket.async_send(buffer,
56 bind(&impl::handle_async_send, this, _1, i->first, i->second));
57 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070058
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080059 m_sendQueue.clear();
60 m_sendPairQueue.clear();
61 }
62 else
63 {
64 // may need to throw exception
65 m_transport.m_isConnected = false;
66 m_transport.close();
67 throw Transport::Error(error, "error while connecting to the forwarder");
68 }
69 }
70
71 void
72 connectTimeoutHandler(const boost::system::error_code& error)
73 {
74 if (error) // e.g., cancelled timer
75 return;
76
77 m_connectionInProgress = false;
78 m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -070079 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080080 m_socket.close();
81 throw Transport::Error(error, "error while connecting to the forwarder");
82 }
83
84 void
85 connect(const typename protocol::endpoint& endpoint)
86 {
87 if (!m_connectionInProgress) {
88 m_connectionInProgress = true;
89
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -070090 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080091 /// @todo Decide whether this number should be configurable
92 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
93 m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
94
95 m_socket.open();
96 m_socket.async_connect(endpoint,
97 bind(&impl::connectHandler, this, _1));
98 }
99 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700100
101 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800102 close()
103 {
104 m_connectTimer.cancel();
105 m_socket.close();
106 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700107 m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800108 m_sendQueue.clear();
109 m_sendPairQueue.clear();
110 }
111
112 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000113 pause()
114 {
115 if (m_transport.m_isExpectingData)
116 {
117 m_transport.m_isExpectingData = false;
118 m_socket.cancel();
119 }
120 }
121
122 void
123 resume()
124 {
125 if (!m_transport.m_isExpectingData)
126 {
127 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700128 m_inputBufferSize = 0;
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000129 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_LENGTH), 0,
130 bind(&impl::handle_async_receive, this, _1, _2));
131 }
132 }
133
134 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800135 send(const Block& wire)
136 {
137 if (!m_transport.m_isConnected)
138 m_sendQueue.push_back(wire);
139 else
140 m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
141 bind(&impl::handle_async_send, this, _1, wire));
142 }
143
144 void
145 send(const Block& header, const Block& payload)
146 {
147 if (!m_transport.m_isConnected)
148 {
149 m_sendPairQueue.push_back(std::make_pair(header, payload));
150 }
151 else
152 {
153 std::vector<boost::asio::const_buffer> buffers;
154 buffers.reserve(2);
155 buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
156 buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700157
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800158 m_socket.async_send(buffers,
159 bind(&impl::handle_async_send, this, _1, header, payload));
160 }
161 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700162
163 inline bool
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800164 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
165 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700166 Block element;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800167 while(offset < availableSize)
168 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700169 bool ok = Block::fromBuffer(buffer + offset, availableSize - offset, element);
170 if (!ok)
171 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800172
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700173 m_transport.receive(element);
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800174 offset += element.size();
175 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700176 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800177 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700178
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800179 void
180 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
181 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800182 if (error)
183 {
184 if (error == boost::system::errc::operation_canceled) {
185 // async receive has been explicitly cancelled (e.g., socket close)
186 return;
187 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700188
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800189 m_socket.close(); // closing at this point may not be that necessary
190 m_transport.m_isConnected = true;
191 throw Transport::Error(error, "error while receiving data from socket");
192 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700193
194 m_inputBufferSize += bytes_recvd;
195 // do magic
196
197 std::size_t offset = 0;
198 bool ok = processAll(m_inputBuffer, offset, m_inputBufferSize);
199 if (!ok && m_inputBufferSize == MAX_LENGTH && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800200 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700201 // very bad... should close connection
202 m_socket.close();
203 m_transport.m_isConnected = false;
204 m_transport.m_isExpectingData = false;
205 throw Transport::Error(boost::system::error_code(),
206 "input buffer full, but a valid TLV cannot be decoded");
207 }
208
209 if (offset > 0)
210 {
211 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800212 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700213 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
214 m_inputBuffer);
215 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800216 }
217 else
218 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700219 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800220 }
221 }
222
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700223 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
224 MAX_LENGTH - m_inputBufferSize), 0,
225 bind(&impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800226 }
227
228 void
229 handle_async_send(const boost::system::error_code& error, const Block& wire)
230 {
231 // pass (needed to keep data block alive during the send)
232 }
233
234 void
235 handle_async_send(const boost::system::error_code& error,
236 const Block& header, const Block& payload)
237 {
238 // pass (needed to keep data blocks alive during the send)
239 }
240
241protected:
242 base_transport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700243
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800244 typename protocol::socket m_socket;
245 uint8_t m_inputBuffer[MAX_LENGTH];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700246 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800247
248 std::list< Block > m_sendQueue;
249 std::list< std::pair<Block, Block> > m_sendPairQueue;
250 bool m_connectionInProgress;
251
252 boost::asio::deadline_timer m_connectTimer;
253};
254
255
256template<class BaseTransport, class Protocol>
257class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
258{
259public:
260 typedef BaseTransport base_transport;
261 typedef Protocol protocol;
262 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> impl;
263
264 StreamTransportWithResolverImpl(base_transport& transport, boost::asio::io_service& ioService)
265 : StreamTransportImpl<base_transport, protocol>(transport, ioService)
266 {
267 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700268
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800269 void
270 resolveHandler(const boost::system::error_code& error,
271 typename protocol::resolver::iterator endpoint,
272 const shared_ptr<typename protocol::resolver>&)
273 {
274 if (error)
275 {
276 if (error == boost::system::errc::operation_canceled)
277 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700278
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800279 throw Transport::Error(error, "Error during resolution of host or port");
280 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700281
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800282 typename protocol::resolver::iterator end;
283 if (endpoint == end)
284 {
285 this->m_connectionInProgress = false;
286 this->m_transport.m_isConnected = false;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700287 this->m_transport.m_isExpectingData = false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800288 this->m_socket.close();
289 throw Transport::Error(error, "Unable to resolve because host or port");
290 }
291
292 this->m_socket.async_connect(*endpoint,
293 bind(&impl::connectHandler, this, _1));
294 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700295
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800296 void
297 connect(const typename protocol::resolver::query& query)
298 {
299 if (!this->m_connectionInProgress) {
300 this->m_connectionInProgress = true;
301
Alexander Afanasyevaa0e7da2014-03-17 14:37:33 -0700302 // Wait at most 4 time::seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800303 /// @todo Decide whether this number should be configurable
304 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
305 this->m_connectTimer.async_wait(bind(&impl::connectTimeoutHandler, this, _1));
306
307 // typename boost::asio::ip::basic_resolver< protocol > resolver;
308 shared_ptr<typename protocol::resolver> resolver =
309 make_shared<typename protocol::resolver>(boost::ref(this->m_socket.get_io_service()));
310
311 resolver->async_resolve(query, bind(&impl::resolveHandler, this, _1, _2, resolver));
312 }
313 }
314};
315
316
317} // namespace ndn
318
319#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP