blob: 5aca79ec365e32f2fe814240a956a11f9b913c2f [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
11#include <ndn-cpp/face.hpp>
12#include <ndn-cpp/transport/unix-transport.hpp>
Alexander Afanasyev54467af2014-01-06 15:45:32 -080013#include "../c/util/ndn_memory.h"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080014
15#include <boost/asio.hpp>
Alexander Afanasyev3c034202014-01-06 00:06:48 -080016#if NDN_CPP_HAVE_CXX11
17// In the std library, the placeholders are in a different namespace than boost.
18using namespace ndn::func_lib::placeholders;
19#endif
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080020
21using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080022typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080023
24namespace ndn {
25
26const size_t MAX_LENGTH = 9000;
27
28class UnixTransport::Impl
29{
30public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080031 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080032 : transport_(transport)
33 , socket_(*transport_.ioService_)
34 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080035 , connectionInProgress_(false)
Alexander Afanasyev0102be92014-01-09 14:31:09 -080036 , connectTimer_(*transport_.ioService_)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080037 {
38 }
39
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080040 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080041 connectHandler(const boost::system::error_code& error)
42 {
43 connectionInProgress_ = false;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080044 connectTimer_.cancel();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080045
46 if (!error)
47 {
48 partialDataSize_ = 0;
49 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -080050 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080051
52 transport_.isConnected_ = true;
53
54 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
55 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -080056 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080057
58 sendQueue_.clear();
59 }
60 else
61 {
62 // may need to throw exception
63 transport_.isConnected_ = false;
64 throw Transport::Error(error, "error while connecting to the forwarder");
65 }
66 }
Alexander Afanasyev0102be92014-01-09 14:31:09 -080067
68 void
69 connectTimeoutHandler(const boost::system::error_code& error)
70 {
71 if (error) // e.g., cancelled timer
72 return;
73
74 connectionInProgress_ = false;
75 transport_.isConnected_ = false;
76 socket_.close();
77 throw Transport::Error(error, "error while connecting to the forwarder");
78 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080079
80 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080081 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080082 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080083 if (!connectionInProgress_) {
84 connectionInProgress_ = true;
85 socket_.open();
86 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
87 func_lib::bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev0102be92014-01-09 14:31:09 -080088
89 // Wait at most 4 seconds to connect
90 /// @todo Decide whether this number should be configurable
91 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
92 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080093 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080094 }
95
96 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080097 close()
98 {
Alexander Afanasyev0102be92014-01-09 14:31:09 -080099 connectTimer_.cancel();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800100 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800101 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800102 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800103
104 void
105 send(const Block &wire)
106 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800107 if (!transport_.isConnected_)
108 sendQueue_.push_back(wire);
109 else
110 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800111 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800112 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800113
114 inline void
115 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
116 {
117 while(offset < availableSize)
118 {
119 Block element(buffer + offset, availableSize - offset);
120 transport_.receive(element);
121
122 offset += element.size();
123 }
124 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800125
126 void
127 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
128 {
129 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800130
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800131 if (error)
132 {
Alexander Afanasyevbf082112014-01-09 14:27:55 -0800133 if (error == boost::system::errc::operation_canceled) {
134 // async receive has been explicitly cancelled (e.g., socket close)
135 return;
136 }
137
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800138 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800139 transport_.isConnected_ = true;
140 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800141 }
142
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800143 if (!error && bytes_recvd > 0)
144 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800145 // inputBuffer_ has bytes_recvd received bytes of data
146 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800147 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800148 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
149 ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
150 partialDataSize_ += newDataSize;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800151
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800152 size_t offset = 0;
153 try
154 {
155 processAll(partialData_, offset, partialDataSize_);
156
157 // no exceptions => processed the whole thing
158 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800159 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800160 // there is a little bit more data available
161
162 offset = 0;
163 partialDataSize_ = bytes_recvd - newDataSize;
164 ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
165
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800166 processAll(partialData_, offset, partialDataSize_);
167
168 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800169 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800170 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800171 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800172 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800173 // done processing
174 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800175 }
176 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800177 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800178 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800179 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800180 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800181 partialDataSize_ -= offset;
182 ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800183 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800184 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800185 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800186 // very bad... should close connection
187 socket_.close();
188 transport_.isConnected_ = true;
189 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800190 }
191 }
192 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800193 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800194 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800195 size_t offset = 0;
196 try
197 {
198 processAll(inputBuffer_, offset, bytes_recvd);
199 }
200 catch(Tlv::Error &error)
201 {
202 if (offset > 0)
203 {
204 partialDataSize_ = bytes_recvd - offset;
205 ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
206 }
207 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800208 }
209 }
210
211 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800212 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800213 }
214
215 void
216 handle_async_send(const boost::system::error_code& error, const Block &wire)
217 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800218 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800219 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800220
221private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800222 UnixTransport &transport_;
223
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800224 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800225 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800226
227 uint8_t partialData_[MAX_LENGTH];
228 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800229
230 std::list< Block > sendQueue_;
231 bool connectionInProgress_;
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800232
233 boost::asio::deadline_timer connectTimer_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800234};
235
236UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
237 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800238{
239}
240
241UnixTransport::~UnixTransport()
242{
243}
244
245void
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800246UnixTransport::connect(boost::asio::io_service &ioService,
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800247 const ReceiveCallback &receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800248{
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800249 if (!static_cast<bool>(impl_)) {
250 Transport::connect(ioService, receiveCallback);
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800251
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800252 impl_ = ptr_lib::make_shared<UnixTransport::Impl> (ptr_lib::ref(*this));
253 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800254 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800255}
256
257void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800258UnixTransport::send(const Block &wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800259{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800260 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800261}
262
263void
264UnixTransport::close()
265{
266 impl_->close();
267}
268
269}