blob: 3c773a8e493b75587cfbae44df80613958e4f217 [file] [log] [blame]
Alexander Afanasyev20d2c582014-01-26 15:32:51 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -08008#include "common.hpp"
Alexander Afanasyev20d2c582014-01-26 15:32:51 -08009
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -080010#include "tcp-transport.hpp"
Alexander Afanasyev20d2c582014-01-26 15:32:51 -080011
Alexander Afanasyev20d2c582014-01-26 15:32:51 -080012#if NDN_CPP_HAVE_CXX11
13// In the std library, the placeholders are in a different namespace than boost.
14using namespace ndn::func_lib::placeholders;
15#endif
16
17using namespace std;
18typedef boost::asio::ip::tcp protocol;
19
20namespace ndn {
21
22const size_t MAX_LENGTH = 9000;
23
24class TcpTransport::Impl
25{
26public:
27 Impl(TcpTransport &transport)
28 : transport_(transport)
29 , socket_(*transport_.ioService_)
30 , partialDataSize_(0)
31 , connectionInProgress_(false)
32 , connectTimer_(*transport_.ioService_)
33 {
34 }
35
36 void
37 connectHandler(const boost::system::error_code& error)
38 {
39 connectionInProgress_ = false;
40 connectTimer_.cancel();
41
42 if (!error)
43 {
44 partialDataSize_ = 0;
45 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
46 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
47
48 transport_.isConnected_ = true;
49
50 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
51 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
52 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
53
54 sendQueue_.clear();
55 }
56 else
57 {
58 // may need to throw exception
59 transport_.isConnected_ = false;
60 transport_.close();
61 throw Transport::Error(error, "error while connecting to the forwarder");
62 }
63 }
64
65 void
66 connectTimeoutHandler(const boost::system::error_code& error)
67 {
68 if (error) // e.g., cancelled timer
69 return;
70
71 connectionInProgress_ = false;
72 transport_.isConnected_ = false;
73 socket_.close();
74 throw Transport::Error(error, "error while connecting to the forwarder");
75 }
76
77 void
78 resolveHandler(const boost::system::error_code& error,
79 boost::asio::ip::tcp::resolver::iterator endpoint,
80 const ptr_lib::shared_ptr<boost::asio::ip::tcp::resolver>&)
81 {
82 if (error)
83 {
84 if (error == boost::system::errc::operation_canceled)
85 return;
86
87 throw Transport::Error(error, "Error during resolution of host or port [" + transport_.host_ + ":" + transport_.port_ + "]");
88 }
89
90 boost::asio::ip::tcp::resolver::iterator end;
91 if (endpoint == end)
92 {
93 connectionInProgress_ = false;
94 transport_.isConnected_ = false;
95 socket_.close();
96 throw Transport::Error(error, "Unable to connect because host or port [" + transport_.host_ + ":" + transport_.port_ + "] cannot be resolved");
97 }
98
99 socket_.async_connect(*endpoint,
100 func_lib::bind(&Impl::connectHandler, this, _1));
101 }
102
103 void
104 connect()
105 {
106 if (!connectionInProgress_) {
107 connectionInProgress_ = true;
108
109 // Wait at most 4 seconds to connect
110 /// @todo Decide whether this number should be configurable
111 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
112 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
113
114 using boost::asio::ip::tcp;
115
116 ptr_lib::shared_ptr<tcp::resolver> resolver =
117 ptr_lib::make_shared<tcp::resolver>(boost::ref(*transport_.ioService_));
118
119 tcp::resolver::query query(transport_.host_, transport_.port_);
120
121 resolver->async_resolve(query, func_lib::bind(&Impl::resolveHandler, this, _1, _2, resolver));
122 }
123 }
124
125 void
126 close()
127 {
128 connectTimer_.cancel();
129 socket_.close();
130 transport_.isConnected_ = false;
131 }
132
133 void
134 send(const Block &wire)
135 {
136 if (!transport_.isConnected_)
137 sendQueue_.push_back(wire);
138 else
139 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
140 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
141 }
142
143 inline void
144 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
145 {
146 while(offset < availableSize)
147 {
148 Block element(buffer + offset, availableSize - offset);
149 transport_.receive(element);
150
151 offset += element.size();
152 }
153 }
154
155 void
156 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
157 {
158 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
159
160 if (error)
161 {
162 if (error == boost::system::errc::operation_canceled) {
163 // async receive has been explicitly cancelled (e.g., socket close)
164 return;
165 }
166
167 socket_.close(); // closing at this point may not be that necessary
168 transport_.isConnected_ = true;
169 throw Transport::Error(error, "error while receiving data from socket");
170 }
171
172 if (!error && bytes_recvd > 0)
173 {
174 // inputBuffer_ has bytes_recvd received bytes of data
175 if (partialDataSize_ > 0)
176 {
177 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800178 std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
179
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800180 partialDataSize_ += newDataSize;
181
182 size_t offset = 0;
183 try
184 {
185 processAll(partialData_, offset, partialDataSize_);
186
187 // no exceptions => processed the whole thing
188 if (bytes_recvd - newDataSize > 0)
189 {
190 // there is a little bit more data available
191
192 offset = 0;
193 partialDataSize_ = bytes_recvd - newDataSize;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800194 std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800195
196 processAll(partialData_, offset, partialDataSize_);
197
198 // no exceptions => processed the whole thing
199 partialDataSize_ = 0;
200 }
201 else
202 {
203 // done processing
204 partialDataSize_ = 0;
205 }
206 }
207 catch(Tlv::Error &)
208 {
209 if (offset > 0)
210 {
211 partialDataSize_ -= offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800212 std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800213 }
214 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
215 {
216 // very bad... should close connection
217 socket_.close();
218 transport_.isConnected_ = true;
219 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
220 }
221 }
222 }
223 else
224 {
225 size_t offset = 0;
226 try
227 {
228 processAll(inputBuffer_, offset, bytes_recvd);
229 }
230 catch(Tlv::Error &error)
231 {
232 if (offset > 0)
233 {
234 partialDataSize_ = bytes_recvd - offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800235 std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev20d2c582014-01-26 15:32:51 -0800236 }
237 }
238 }
239 }
240
241 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
242 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
243 }
244
245 void
246 handle_async_send(const boost::system::error_code& error, const Block &wire)
247 {
248 // pass (needed to keep data block alive during the send)
249 }
250
251private:
252 TcpTransport &transport_;
253
254 protocol::socket socket_;
255 uint8_t inputBuffer_[MAX_LENGTH];
256
257 uint8_t partialData_[MAX_LENGTH];
258 size_t partialDataSize_;
259
260 std::list< Block > sendQueue_;
261 bool connectionInProgress_;
262
263 boost::asio::deadline_timer connectTimer_;
264};
265
266TcpTransport::TcpTransport(const std::string& host, const std::string& port/* = "6363"*/)
267 : host_(host)
268 , port_(port)
269{
270}
271
272TcpTransport::~TcpTransport()
273{
274}
275
276void
277TcpTransport::connect(boost::asio::io_service &ioService,
278 const ReceiveCallback &receiveCallback)
279{
280 if (!static_cast<bool>(impl_)) {
281 Transport::connect(ioService, receiveCallback);
282
283 impl_ = ptr_lib::make_shared<TcpTransport::Impl> (ptr_lib::ref(*this));
284 }
285 impl_->connect();
286}
287
288void
289TcpTransport::send(const Block &wire)
290{
291 impl_->send(wire);
292}
293
294void
295TcpTransport::close()
296{
297 impl_->close();
298}
299
300}