blob: 5ec91b095a13d0658783a45b95bc546b41db7bd4 [file] [log] [blame]
Alexander Afanasyevc169a812014-05-20 20:37:29 -04001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev5964fb72014-02-18 12:42:45 -08002/**
Junxiao Shi02a4bf32015-02-21 21:07:46 -07003 * Copyright (c) 2013-2015 Regents of the University of California.
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
Alexander Afanasyevdfa52c42014-04-24 21:10:11 -07006 *
Alexander Afanasyevc169a812014-05-20 20:37:29 -04007 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080020 */
21
22#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
23#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
24
Junxiao Shi71355d52014-12-11 09:20:44 -070025#include "transport.hpp"
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080026
Alexander Afanasyev258ec2b2014-05-14 16:15:37 -070027#include <list>
28
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080029namespace ndn {
30
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080031template<class BaseTransport, class Protocol>
32class StreamTransportImpl
33{
34public:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070035 typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080036
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070037 typedef std::list<Block> BlockSequence;
38 typedef std::list<BlockSequence> TransmissionQueue;
39
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070040 StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080041 : m_transport(transport)
42 , m_socket(ioService)
Alexander Afanasyev937aa782014-03-21 13:17:57 -070043 , m_inputBufferSize(0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080044 , m_connectionInProgress(false)
45 , m_connectTimer(ioService)
46 {
47 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -070048
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080049 void
50 connectHandler(const boost::system::error_code& error)
51 {
52 m_connectionInProgress = false;
53 m_connectTimer.cancel();
54
55 if (!error)
56 {
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +000057 resume();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080058 m_transport.m_isConnected = true;
59
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070060 if (!m_transmissionQueue.empty()) {
61 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070062 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -070063 m_transmissionQueue.begin()));
64 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080065 }
66 else
67 {
68 // may need to throw exception
69 m_transport.m_isConnected = false;
70 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -070071 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080072 }
73 }
74
75 void
76 connectTimeoutHandler(const boost::system::error_code& error)
77 {
78 if (error) // e.g., cancelled timer
79 return;
80
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -070081 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -070082 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080083 }
84
85 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070086 connect(const typename Protocol::endpoint& endpoint)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080087 {
88 if (!m_connectionInProgress) {
89 m_connectionInProgress = true;
90
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070091 // Wait at most 4 seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080092 /// @todo Decide whether this number should be configurable
93 m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070094 m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080095
96 m_socket.open();
97 m_socket.async_connect(endpoint,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -070098 bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -080099 }
100 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700101
102 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800103 close()
104 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700105 m_connectionInProgress = false;
106
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700107 boost::system::error_code error; // to silently ignore all errors
108 m_connectTimer.cancel(error);
109 m_socket.cancel(error);
110 m_socket.close(error);
111
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800112 m_transport.m_isConnected = false;
Alexander Afanasyev6e0c5a52014-03-18 16:18:58 -0700113 m_transport.m_isExpectingData = false;
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700114 m_transmissionQueue.clear();
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800115 }
116
117 void
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000118 pause()
119 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700120 if (m_connectionInProgress)
121 return;
122
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000123 if (m_transport.m_isExpectingData)
124 {
125 m_transport.m_isExpectingData = false;
126 m_socket.cancel();
127 }
128 }
129
Alexander Afanasyev9d158f02015-02-17 21:30:19 -0800130 /**
131 * @warning Must not be called directly or indirectly from within handleAsyncReceive invocation
132 */
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000133 void
134 resume()
135 {
Alexander Afanasyev6507fb12014-04-28 23:18:56 -0700136 if (m_connectionInProgress)
137 return;
138
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000139 if (!m_transport.m_isExpectingData)
140 {
141 m_transport.m_isExpectingData = true;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700142 m_inputBufferSize = 0;
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700143 m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
144 bind(&Impl::handleAsyncReceive, this, _1, _2));
Alexander Afanasyev52afb3f2014-03-07 09:05:35 +0000145 }
146 }
147
148 void
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800149 send(const Block& wire)
150 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700151 BlockSequence sequence;
152 sequence.push_back(wire);
153 m_transmissionQueue.push_back(sequence);
154
155 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
156 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700157 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700158 m_transmissionQueue.begin()));
159 }
160
161 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
162 // next write will be scheduled either in connectHandler or in asyncWriteHandler
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800163 }
164
165 void
166 send(const Block& header, const Block& payload)
167 {
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700168 BlockSequence sequence;
169 sequence.push_back(header);
170 sequence.push_back(payload);
171 m_transmissionQueue.push_back(sequence);
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700172
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700173 if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
174 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700175 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700176 m_transmissionQueue.begin()));
177 }
178
179 // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
180 // next write will be scheduled either in connectHandler or in asyncWriteHandler
181 }
182
183 void
184 handleAsyncWrite(const boost::system::error_code& error,
185 TransmissionQueue::iterator queueItem)
186 {
187 if (error)
188 {
189 if (error == boost::system::errc::operation_canceled) {
190 // async receive has been explicitly cancelled (e.g., socket close)
191 return;
192 }
193
194 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700195 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800196 }
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700197
Alexander Afanasyevfba1ac62015-08-26 15:19:13 -0700198 if (!m_transport.m_isConnected) {
199 return; // queue has been already cleared
200 }
201
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700202 m_transmissionQueue.erase(queueItem);
203
204 if (!m_transmissionQueue.empty()) {
205 boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700206 bind(&Impl::handleAsyncWrite, this, _1,
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700207 m_transmissionQueue.begin()));
208 }
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800209 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700210
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700211 bool
212 processAll(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800213 {
Junxiao Shi02a4bf32015-02-21 21:07:46 -0700214 while (offset < nBytesAvailable) {
215 bool isOk = false;
216 Block element;
217 std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
218 if (!isOk)
219 return false;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800220
Junxiao Shi02a4bf32015-02-21 21:07:46 -0700221 m_transport.receive(element);
222 offset += element.size();
223 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700224 return true;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800225 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700226
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800227 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700228 handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800229 {
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800230 if (error)
231 {
232 if (error == boost::system::errc::operation_canceled) {
233 // async receive has been explicitly cancelled (e.g., socket close)
234 return;
235 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700236
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700237 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700238 BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800239 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700240
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700241 m_inputBufferSize += nBytesRecvd;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700242 // do magic
243
244 std::size_t offset = 0;
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700245 bool hasProcessedSome = processAll(m_inputBuffer, offset, m_inputBufferSize);
246 if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800247 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700248 m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700249 BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
250 "input buffer full, but a valid TLV cannot be "
251 "decoded"));
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700252 }
253
254 if (offset > 0)
255 {
256 if (offset != m_inputBufferSize)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800257 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700258 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
259 m_inputBuffer);
260 m_inputBufferSize -= offset;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800261 }
262 else
263 {
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700264 m_inputBufferSize = 0;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800265 }
266 }
267
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700268 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700269 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
270 bind(&Impl::handleAsyncReceive, this, _1, _2));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800271 }
272
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800273protected:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700274 BaseTransport& m_transport;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700275
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700276 typename Protocol::socket m_socket;
277 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700278 size_t m_inputBufferSize;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800279
Alexander Afanasyev6a05b4b2014-07-18 17:23:00 -0700280 TransmissionQueue m_transmissionQueue;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800281 bool m_connectionInProgress;
282
283 boost::asio::deadline_timer m_connectTimer;
284};
285
286
287template<class BaseTransport, class Protocol>
288class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
289{
290public:
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700291 typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> Impl;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800292
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700293 StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
294 : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800295 {
296 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700297
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800298 void
299 resolveHandler(const boost::system::error_code& error,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700300 typename Protocol::resolver::iterator endpoint,
301 const shared_ptr<typename Protocol::resolver>&)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800302 {
303 if (error)
304 {
305 if (error == boost::system::errc::operation_canceled)
306 return;
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700307
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700308 BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800309 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700310
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700311 typename Protocol::resolver::iterator end;
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800312 if (endpoint == end)
313 {
Alexander Afanasyevbc5830a2014-07-11 15:02:38 -0700314 this->m_transport.close();
Spyridon Mastorakis0d2ed2e2015-07-27 19:09:12 -0700315 BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800316 }
317
318 this->m_socket.async_connect(*endpoint,
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700319 bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800320 }
Alexander Afanasyev937aa782014-03-21 13:17:57 -0700321
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800322 void
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700323 connect(const typename Protocol::resolver::query& query)
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800324 {
325 if (!this->m_connectionInProgress) {
326 this->m_connectionInProgress = true;
327
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700328 // Wait at most 4 seconds to connect
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800329 /// @todo Decide whether this number should be configurable
330 this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700331 this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800332
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700333 // typename boost::asio::ip::basic_resolver< Protocol > resolver;
334 shared_ptr<typename Protocol::resolver> resolver =
335 make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800336
Alexander Afanasyev49bb1fb2014-07-21 12:54:01 -0700337 resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
Alexander Afanasyev5964fb72014-02-18 12:42:45 -0800338 }
339 }
340};
341
342
343} // namespace ndn
344
345#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP