blob: f75dcd24503deb988c69fa4052ae3edf5abefbcf [file] [log] [blame]
Alexander Afanasyev20d2c582014-01-26 15:32:51 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
11#include <ndn-cpp-dev/face.hpp>
12#include <ndn-cpp-dev/transport/tcp-transport.hpp>
13#include "../c/util/ndn_memory.h"
14
15#include <boost/asio.hpp>
16#if NDN_CPP_HAVE_CXX11
17// In the std library, the placeholders are in a different namespace than boost.
18using namespace ndn::func_lib::placeholders;
19#endif
20
21using namespace std;
22typedef boost::asio::ip::tcp protocol;
23
24namespace ndn {
25
26const size_t MAX_LENGTH = 9000;
27
28class TcpTransport::Impl
29{
30public:
31 Impl(TcpTransport &transport)
32 : transport_(transport)
33 , socket_(*transport_.ioService_)
34 , partialDataSize_(0)
35 , connectionInProgress_(false)
36 , connectTimer_(*transport_.ioService_)
37 {
38 }
39
40 void
41 connectHandler(const boost::system::error_code& error)
42 {
43 connectionInProgress_ = false;
44 connectTimer_.cancel();
45
46 if (!error)
47 {
48 partialDataSize_ = 0;
49 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
50 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
51
52 transport_.isConnected_ = true;
53
54 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
55 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
56 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
57
58 sendQueue_.clear();
59 }
60 else
61 {
62 // may need to throw exception
63 transport_.isConnected_ = false;
64 transport_.close();
65 throw Transport::Error(error, "error while connecting to the forwarder");
66 }
67 }
68
69 void
70 connectTimeoutHandler(const boost::system::error_code& error)
71 {
72 if (error) // e.g., cancelled timer
73 return;
74
75 connectionInProgress_ = false;
76 transport_.isConnected_ = false;
77 socket_.close();
78 throw Transport::Error(error, "error while connecting to the forwarder");
79 }
80
81 void
82 resolveHandler(const boost::system::error_code& error,
83 boost::asio::ip::tcp::resolver::iterator endpoint,
84 const ptr_lib::shared_ptr<boost::asio::ip::tcp::resolver>&)
85 {
86 if (error)
87 {
88 if (error == boost::system::errc::operation_canceled)
89 return;
90
91 throw Transport::Error(error, "Error during resolution of host or port [" + transport_.host_ + ":" + transport_.port_ + "]");
92 }
93
94 boost::asio::ip::tcp::resolver::iterator end;
95 if (endpoint == end)
96 {
97 connectionInProgress_ = false;
98 transport_.isConnected_ = false;
99 socket_.close();
100 throw Transport::Error(error, "Unable to connect because host or port [" + transport_.host_ + ":" + transport_.port_ + "] cannot be resolved");
101 }
102
103 socket_.async_connect(*endpoint,
104 func_lib::bind(&Impl::connectHandler, this, _1));
105 }
106
107 void
108 connect()
109 {
110 if (!connectionInProgress_) {
111 connectionInProgress_ = true;
112
113 // Wait at most 4 seconds to connect
114 /// @todo Decide whether this number should be configurable
115 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
116 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
117
118 using boost::asio::ip::tcp;
119
120 ptr_lib::shared_ptr<tcp::resolver> resolver =
121 ptr_lib::make_shared<tcp::resolver>(boost::ref(*transport_.ioService_));
122
123 tcp::resolver::query query(transport_.host_, transport_.port_);
124
125 resolver->async_resolve(query, func_lib::bind(&Impl::resolveHandler, this, _1, _2, resolver));
126 }
127 }
128
129 void
130 close()
131 {
132 connectTimer_.cancel();
133 socket_.close();
134 transport_.isConnected_ = false;
135 }
136
137 void
138 send(const Block &wire)
139 {
140 if (!transport_.isConnected_)
141 sendQueue_.push_back(wire);
142 else
143 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
144 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
145 }
146
147 inline void
148 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
149 {
150 while(offset < availableSize)
151 {
152 Block element(buffer + offset, availableSize - offset);
153 transport_.receive(element);
154
155 offset += element.size();
156 }
157 }
158
159 void
160 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
161 {
162 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
163
164 if (error)
165 {
166 if (error == boost::system::errc::operation_canceled) {
167 // async receive has been explicitly cancelled (e.g., socket close)
168 return;
169 }
170
171 socket_.close(); // closing at this point may not be that necessary
172 transport_.isConnected_ = true;
173 throw Transport::Error(error, "error while receiving data from socket");
174 }
175
176 if (!error && bytes_recvd > 0)
177 {
178 // inputBuffer_ has bytes_recvd received bytes of data
179 if (partialDataSize_ > 0)
180 {
181 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
182 ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
183 partialDataSize_ += newDataSize;
184
185 size_t offset = 0;
186 try
187 {
188 processAll(partialData_, offset, partialDataSize_);
189
190 // no exceptions => processed the whole thing
191 if (bytes_recvd - newDataSize > 0)
192 {
193 // there is a little bit more data available
194
195 offset = 0;
196 partialDataSize_ = bytes_recvd - newDataSize;
197 ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
198
199 processAll(partialData_, offset, partialDataSize_);
200
201 // no exceptions => processed the whole thing
202 partialDataSize_ = 0;
203 }
204 else
205 {
206 // done processing
207 partialDataSize_ = 0;
208 }
209 }
210 catch(Tlv::Error &)
211 {
212 if (offset > 0)
213 {
214 partialDataSize_ -= offset;
215 ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
216 }
217 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
218 {
219 // very bad... should close connection
220 socket_.close();
221 transport_.isConnected_ = true;
222 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
223 }
224 }
225 }
226 else
227 {
228 size_t offset = 0;
229 try
230 {
231 processAll(inputBuffer_, offset, bytes_recvd);
232 }
233 catch(Tlv::Error &error)
234 {
235 if (offset > 0)
236 {
237 partialDataSize_ = bytes_recvd - offset;
238 ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
239 }
240 }
241 }
242 }
243
244 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
245 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
246 }
247
248 void
249 handle_async_send(const boost::system::error_code& error, const Block &wire)
250 {
251 // pass (needed to keep data block alive during the send)
252 }
253
254private:
255 TcpTransport &transport_;
256
257 protocol::socket socket_;
258 uint8_t inputBuffer_[MAX_LENGTH];
259
260 uint8_t partialData_[MAX_LENGTH];
261 size_t partialDataSize_;
262
263 std::list< Block > sendQueue_;
264 bool connectionInProgress_;
265
266 boost::asio::deadline_timer connectTimer_;
267};
268
269TcpTransport::TcpTransport(const std::string& host, const std::string& port/* = "6363"*/)
270 : host_(host)
271 , port_(port)
272{
273}
274
275TcpTransport::~TcpTransport()
276{
277}
278
279void
280TcpTransport::connect(boost::asio::io_service &ioService,
281 const ReceiveCallback &receiveCallback)
282{
283 if (!static_cast<bool>(impl_)) {
284 Transport::connect(ioService, receiveCallback);
285
286 impl_ = ptr_lib::make_shared<TcpTransport::Impl> (ptr_lib::ref(*this));
287 }
288 impl_->connect();
289}
290
291void
292TcpTransport::send(const Block &wire)
293{
294 impl_->send(wire);
295}
296
297void
298TcpTransport::close()
299{
300 impl_->close();
301}
302
303}