blob: 5d26e1579db9e5bcd06c635cd2e3ede6eee6f99e [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
Yingdi Yu61ec2722014-01-20 14:22:32 -080011#include <ndn-cpp-dev/face.hpp>
12#include <ndn-cpp-dev/transport/unix-transport.hpp>
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080013
14#include <boost/asio.hpp>
Alexander Afanasyev3c034202014-01-06 00:06:48 -080015#if NDN_CPP_HAVE_CXX11
16// In the std library, the placeholders are in a different namespace than boost.
17using namespace ndn::func_lib::placeholders;
18#endif
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080019
20using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080021typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080022
23namespace ndn {
24
25const size_t MAX_LENGTH = 9000;
26
27class UnixTransport::Impl
28{
29public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080030 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080031 : transport_(transport)
32 , socket_(*transport_.ioService_)
33 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080034 , connectionInProgress_(false)
Alexander Afanasyev0102be92014-01-09 14:31:09 -080035 , connectTimer_(*transport_.ioService_)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080036 {
37 }
38
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080039 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080040 connectHandler(const boost::system::error_code& error)
41 {
42 connectionInProgress_ = false;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080043 connectTimer_.cancel();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080044
45 if (!error)
46 {
47 partialDataSize_ = 0;
48 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -080049 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080050
51 transport_.isConnected_ = true;
52
53 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
54 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -080055 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080056
57 sendQueue_.clear();
58 }
59 else
60 {
61 // may need to throw exception
62 transport_.isConnected_ = false;
Alexander Afanasyev8995f542014-01-17 15:33:44 -080063 transport_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080064 throw Transport::Error(error, "error while connecting to the forwarder");
65 }
66 }
Alexander Afanasyev0102be92014-01-09 14:31:09 -080067
68 void
69 connectTimeoutHandler(const boost::system::error_code& error)
70 {
71 if (error) // e.g., cancelled timer
72 return;
73
74 connectionInProgress_ = false;
75 transport_.isConnected_ = false;
76 socket_.close();
77 throw Transport::Error(error, "error while connecting to the forwarder");
78 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080079
80 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080081 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080082 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080083 if (!connectionInProgress_) {
84 connectionInProgress_ = true;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080085
86 // Wait at most 4 seconds to connect
87 /// @todo Decide whether this number should be configurable
88 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
89 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev8995f542014-01-17 15:33:44 -080090
91 socket_.open();
92 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
93 func_lib::bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080094 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080095 }
96
97 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080098 close()
99 {
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800100 connectTimer_.cancel();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800101 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800102 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800103 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800104
105 void
106 send(const Block &wire)
107 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800108 if (!transport_.isConnected_)
109 sendQueue_.push_back(wire);
110 else
111 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800112 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800113 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800114
115 inline void
116 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
117 {
118 while(offset < availableSize)
119 {
120 Block element(buffer + offset, availableSize - offset);
121 transport_.receive(element);
122
123 offset += element.size();
124 }
125 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800126
127 void
128 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
129 {
130 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800131
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800132 if (error)
133 {
Alexander Afanasyevbf082112014-01-09 14:27:55 -0800134 if (error == boost::system::errc::operation_canceled) {
135 // async receive has been explicitly cancelled (e.g., socket close)
136 return;
137 }
138
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800139 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800140 transport_.isConnected_ = true;
141 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800142 }
143
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800144 if (!error && bytes_recvd > 0)
145 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800146 // inputBuffer_ has bytes_recvd received bytes of data
147 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800148 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800149 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800150 std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
151
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800152 partialDataSize_ += newDataSize;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800153
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800154 size_t offset = 0;
155 try
156 {
157 processAll(partialData_, offset, partialDataSize_);
158
159 // no exceptions => processed the whole thing
160 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800161 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800162 // there is a little bit more data available
163
164 offset = 0;
165 partialDataSize_ = bytes_recvd - newDataSize;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800166 std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800167
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800168 processAll(partialData_, offset, partialDataSize_);
169
170 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800171 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800172 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800173 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800174 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800175 // done processing
176 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800177 }
178 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800179 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800180 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800181 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800182 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800183 partialDataSize_ -= offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800184 std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800185 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800186 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800187 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800188 // very bad... should close connection
189 socket_.close();
190 transport_.isConnected_ = true;
191 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800192 }
193 }
194 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800195 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800196 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800197 size_t offset = 0;
198 try
199 {
200 processAll(inputBuffer_, offset, bytes_recvd);
201 }
202 catch(Tlv::Error &error)
203 {
204 if (offset > 0)
205 {
206 partialDataSize_ = bytes_recvd - offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800207 std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800208 }
209 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800210 }
211 }
212
213 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800214 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800215 }
216
217 void
218 handle_async_send(const boost::system::error_code& error, const Block &wire)
219 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800220 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800221 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800222
223private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800224 UnixTransport &transport_;
225
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800226 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800227 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800228
229 uint8_t partialData_[MAX_LENGTH];
230 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800231
232 std::list< Block > sendQueue_;
233 bool connectionInProgress_;
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800234
235 boost::asio::deadline_timer connectTimer_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800236};
237
238UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
239 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800240{
241}
242
243UnixTransport::~UnixTransport()
244{
245}
246
247void
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800248UnixTransport::connect(boost::asio::io_service &ioService,
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800249 const ReceiveCallback &receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800250{
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800251 if (!static_cast<bool>(impl_)) {
252 Transport::connect(ioService, receiveCallback);
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800253
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800254 impl_ = ptr_lib::make_shared<UnixTransport::Impl> (ptr_lib::ref(*this));
255 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800256 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800257}
258
259void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800260UnixTransport::send(const Block &wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800261{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800262 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800263}
264
265void
266UnixTransport::close()
267{
268 impl_->close();
269}
270
271}