blob: cd00a46bb2c447cc7ad16d8f1614539de68663f4 [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
11#include <ndn-cpp/face.hpp>
12#include <ndn-cpp/transport/unix-transport.hpp>
Alexander Afanasyev54467af2014-01-06 15:45:32 -080013#include "../c/util/ndn_memory.h"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080014
15#include <boost/asio.hpp>
Alexander Afanasyev3c034202014-01-06 00:06:48 -080016#if NDN_CPP_HAVE_CXX11
17// In the std library, the placeholders are in a different namespace than boost.
18using namespace ndn::func_lib::placeholders;
19#endif
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080020
21using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080022typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080023
24namespace ndn {
25
26const size_t MAX_LENGTH = 9000;
27
28class UnixTransport::Impl
29{
30public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080031 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080032 : transport_(transport)
33 , socket_(*transport_.ioService_)
34 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080035 , connectionInProgress_(false)
Alexander Afanasyev0102be92014-01-09 14:31:09 -080036 , connectTimer_(*transport_.ioService_)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080037 {
38 }
39
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080040 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080041 connectHandler(const boost::system::error_code& error)
42 {
43 connectionInProgress_ = false;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080044 connectTimer_.cancel();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080045
46 if (!error)
47 {
48 partialDataSize_ = 0;
49 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -080050 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080051
52 transport_.isConnected_ = true;
53
54 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
55 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -080056 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080057
58 sendQueue_.clear();
59 }
60 else
61 {
62 // may need to throw exception
63 transport_.isConnected_ = false;
Alexander Afanasyev8995f542014-01-17 15:33:44 -080064 transport_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080065 throw Transport::Error(error, "error while connecting to the forwarder");
66 }
67 }
Alexander Afanasyev0102be92014-01-09 14:31:09 -080068
69 void
70 connectTimeoutHandler(const boost::system::error_code& error)
71 {
72 if (error) // e.g., cancelled timer
73 return;
74
75 connectionInProgress_ = false;
76 transport_.isConnected_ = false;
77 socket_.close();
78 throw Transport::Error(error, "error while connecting to the forwarder");
79 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080080
81 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080082 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080083 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080084 if (!connectionInProgress_) {
85 connectionInProgress_ = true;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080086
87 // Wait at most 4 seconds to connect
88 /// @todo Decide whether this number should be configurable
89 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
90 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev8995f542014-01-17 15:33:44 -080091
92 socket_.open();
93 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
94 func_lib::bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080095 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080096 }
97
98 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080099 close()
100 {
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800101 connectTimer_.cancel();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800102 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800103 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800104 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800105
106 void
107 send(const Block &wire)
108 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800109 if (!transport_.isConnected_)
110 sendQueue_.push_back(wire);
111 else
112 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800113 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800114 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800115
116 inline void
117 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
118 {
119 while(offset < availableSize)
120 {
121 Block element(buffer + offset, availableSize - offset);
122 transport_.receive(element);
123
124 offset += element.size();
125 }
126 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800127
128 void
129 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
130 {
131 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800132
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800133 if (error)
134 {
Alexander Afanasyevbf082112014-01-09 14:27:55 -0800135 if (error == boost::system::errc::operation_canceled) {
136 // async receive has been explicitly cancelled (e.g., socket close)
137 return;
138 }
139
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800140 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800141 transport_.isConnected_ = true;
142 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800143 }
144
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800145 if (!error && bytes_recvd > 0)
146 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800147 // inputBuffer_ has bytes_recvd received bytes of data
148 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800149 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800150 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
151 ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
152 partialDataSize_ += newDataSize;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800153
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800154 size_t offset = 0;
155 try
156 {
157 processAll(partialData_, offset, partialDataSize_);
158
159 // no exceptions => processed the whole thing
160 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800161 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800162 // there is a little bit more data available
163
164 offset = 0;
165 partialDataSize_ = bytes_recvd - newDataSize;
166 ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
167
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800168 processAll(partialData_, offset, partialDataSize_);
169
170 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800171 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800172 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800173 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800174 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800175 // done processing
176 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800177 }
178 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800179 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800180 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800181 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800182 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800183 partialDataSize_ -= offset;
184 ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800185 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800186 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800187 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800188 // very bad... should close connection
189 socket_.close();
190 transport_.isConnected_ = true;
191 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800192 }
193 }
194 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800195 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800196 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800197 size_t offset = 0;
198 try
199 {
200 processAll(inputBuffer_, offset, bytes_recvd);
201 }
202 catch(Tlv::Error &error)
203 {
204 if (offset > 0)
205 {
206 partialDataSize_ = bytes_recvd - offset;
207 ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
208 }
209 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800210 }
211 }
212
213 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800214 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800215 }
216
217 void
218 handle_async_send(const boost::system::error_code& error, const Block &wire)
219 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800220 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800221 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800222
223private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800224 UnixTransport &transport_;
225
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800226 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800227 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800228
229 uint8_t partialData_[MAX_LENGTH];
230 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800231
232 std::list< Block > sendQueue_;
233 bool connectionInProgress_;
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800234
235 boost::asio::deadline_timer connectTimer_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800236};
237
238UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
239 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800240{
241}
242
243UnixTransport::~UnixTransport()
244{
245}
246
247void
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800248UnixTransport::connect(boost::asio::io_service &ioService,
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800249 const ReceiveCallback &receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800250{
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800251 if (!static_cast<bool>(impl_)) {
252 Transport::connect(ioService, receiveCallback);
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800253
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800254 impl_ = ptr_lib::make_shared<UnixTransport::Impl> (ptr_lib::ref(*this));
255 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800256 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800257}
258
259void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800260UnixTransport::send(const Block &wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800261{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800262 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800263}
264
265void
266UnixTransport::close()
267{
268 impl_->close();
269}
270
271}