blob: a12c223a3123c76a976026b1593151d222e440a9 [file] [log] [blame]
Alexander Afanasyev20d2c582014-01-26 15:32:51 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
Alexander Afanasyev09c613f2014-01-29 00:23:58 -080011#include "face.hpp"
12#include "transport/tcp-transport.hpp"
Alexander Afanasyev20d2c582014-01-26 15:32:51 -080013
14#include <boost/asio.hpp>
15#if NDN_CPP_HAVE_CXX11
16// In the std library, the placeholders are in a different namespace than boost.
17using namespace ndn::func_lib::placeholders;
18#endif
19
20using namespace std;
21typedef boost::asio::ip::tcp protocol;
22
23namespace ndn {
24
25const size_t MAX_LENGTH = 9000;
26
27class TcpTransport::Impl
28{
29public:
30 Impl(TcpTransport &transport)
31 : transport_(transport)
32 , socket_(*transport_.ioService_)
33 , partialDataSize_(0)
34 , connectionInProgress_(false)
35 , connectTimer_(*transport_.ioService_)
36 {
37 }
38
39 void
40 connectHandler(const boost::system::error_code& error)
41 {
42 connectionInProgress_ = false;
43 connectTimer_.cancel();
44
45 if (!error)
46 {
47 partialDataSize_ = 0;
48 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
49 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
50
51 transport_.isConnected_ = true;
52
53 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
54 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
55 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
56
57 sendQueue_.clear();
58 }
59 else
60 {
61 // may need to throw exception
62 transport_.isConnected_ = false;
63 transport_.close();
64 throw Transport::Error(error, "error while connecting to the forwarder");
65 }
66 }
67
68 void
69 connectTimeoutHandler(const boost::system::error_code& error)
70 {
71 if (error) // e.g., cancelled timer
72 return;
73
74 connectionInProgress_ = false;
75 transport_.isConnected_ = false;
76 socket_.close();
77 throw Transport::Error(error, "error while connecting to the forwarder");
78 }
79
80 void
81 resolveHandler(const boost::system::error_code& error,
82 boost::asio::ip::tcp::resolver::iterator endpoint,
83 const ptr_lib::shared_ptr<boost::asio::ip::tcp::resolver>&)
84 {
85 if (error)
86 {
87 if (error == boost::system::errc::operation_canceled)
88 return;
89
90 throw Transport::Error(error, "Error during resolution of host or port [" + transport_.host_ + ":" + transport_.port_ + "]");
91 }
92
93 boost::asio::ip::tcp::resolver::iterator end;
94 if (endpoint == end)
95 {
96 connectionInProgress_ = false;
97 transport_.isConnected_ = false;
98 socket_.close();
99 throw Transport::Error(error, "Unable to connect because host or port [" + transport_.host_ + ":" + transport_.port_ + "] cannot be resolved");
100 }
101
102 socket_.async_connect(*endpoint,
103 func_lib::bind(&Impl::connectHandler, this, _1));
104 }
105
106 void
107 connect()
108 {
109 if (!connectionInProgress_) {
110 connectionInProgress_ = true;
111
112 // Wait at most 4 seconds to connect
113 /// @todo Decide whether this number should be configurable
114 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
115 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
116
117 using boost::asio::ip::tcp;
118
119 ptr_lib::shared_ptr<tcp::resolver> resolver =
120 ptr_lib::make_shared<tcp::resolver>(boost::ref(*transport_.ioService_));
121
122 tcp::resolver::query query(transport_.host_, transport_.port_);
123
124 resolver->async_resolve(query, func_lib::bind(&Impl::resolveHandler, this, _1, _2, resolver));
125 }
126 }
127
128 void
129 close()
130 {
131 connectTimer_.cancel();
132 socket_.close();
133 transport_.isConnected_ = false;
134 }
135
136 void
137 send(const Block &wire)
138 {
139 if (!transport_.isConnected_)
140 sendQueue_.push_back(wire);
141 else
142 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
143 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
144 }
145
146 inline void
147 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
148 {
149 while(offset < availableSize)
150 {
151 Block element(buffer + offset, availableSize - offset);
152 transport_.receive(element);
153
154 offset += element.size();
155 }
156 }
157
158 void
159 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
160 {
161 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
162
163 if (error)
164 {
165 if (error == boost::system::errc::operation_canceled) {
166 // async receive has been explicitly cancelled (e.g., socket close)
167 return;
168 }
169
170 socket_.close(); // closing at this point may not be that necessary
171 transport_.isConnected_ = true;
172 throw Transport::Error(error, "error while receiving data from socket");
173 }
174
175 if (!error && bytes_recvd > 0)
176 {
177 // inputBuffer_ has bytes_recvd received bytes of data
178 if (partialDataSize_ > 0)
179 {
180 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800181 std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
182
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800183 partialDataSize_ += newDataSize;
184
185 size_t offset = 0;
186 try
187 {
188 processAll(partialData_, offset, partialDataSize_);
189
190 // no exceptions => processed the whole thing
191 if (bytes_recvd - newDataSize > 0)
192 {
193 // there is a little bit more data available
194
195 offset = 0;
196 partialDataSize_ = bytes_recvd - newDataSize;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800197 std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800198
199 processAll(partialData_, offset, partialDataSize_);
200
201 // no exceptions => processed the whole thing
202 partialDataSize_ = 0;
203 }
204 else
205 {
206 // done processing
207 partialDataSize_ = 0;
208 }
209 }
210 catch(Tlv::Error &)
211 {
212 if (offset > 0)
213 {
214 partialDataSize_ -= offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800215 std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800216 }
217 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
218 {
219 // very bad... should close connection
220 socket_.close();
221 transport_.isConnected_ = true;
222 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
223 }
224 }
225 }
226 else
227 {
228 size_t offset = 0;
229 try
230 {
231 processAll(inputBuffer_, offset, bytes_recvd);
232 }
233 catch(Tlv::Error &error)
234 {
235 if (offset > 0)
236 {
237 partialDataSize_ = bytes_recvd - offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800238 std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800239 }
240 }
241 }
242 }
243
244 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
245 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
246 }
247
248 void
249 handle_async_send(const boost::system::error_code& error, const Block &wire)
250 {
251 // pass (needed to keep data block alive during the send)
252 }
253
254private:
255 TcpTransport &transport_;
256
257 protocol::socket socket_;
258 uint8_t inputBuffer_[MAX_LENGTH];
259
260 uint8_t partialData_[MAX_LENGTH];
261 size_t partialDataSize_;
262
263 std::list< Block > sendQueue_;
264 bool connectionInProgress_;
265
266 boost::asio::deadline_timer connectTimer_;
267};
268
269TcpTransport::TcpTransport(const std::string& host, const std::string& port/* = "6363"*/)
270 : host_(host)
271 , port_(port)
272{
273}
274
275TcpTransport::~TcpTransport()
276{
277}
278
279void
280TcpTransport::connect(boost::asio::io_service &ioService,
281 const ReceiveCallback &receiveCallback)
282{
283 if (!static_cast<bool>(impl_)) {
284 Transport::connect(ioService, receiveCallback);
285
286 impl_ = ptr_lib::make_shared<TcpTransport::Impl> (ptr_lib::ref(*this));
287 }
288 impl_->connect();
289}
290
291void
292TcpTransport::send(const Block &wire)
293{
294 impl_->send(wire);
295}
296
297void
298TcpTransport::close()
299{
300 impl_->close();
301}
302
303}