blob: 84424f3b3c5d080ea10d5779cfa0e89b9e721259 [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -08008#include "common.hpp"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08009
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -080010#include "unix-transport.hpp"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080011
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -080012#include "../face.hpp"
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -080013#include <cstdlib>
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080014
15using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080016typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080017
18namespace ndn {
19
20const size_t MAX_LENGTH = 9000;
21
22class UnixTransport::Impl
23{
24public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080025 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080026 : transport_(transport)
27 , socket_(*transport_.ioService_)
28 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080029 , connectionInProgress_(false)
Alexander Afanasyev0102be92014-01-09 14:31:09 -080030 , connectTimer_(*transport_.ioService_)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080031 {
32 }
33
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080034 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080035 connectHandler(const boost::system::error_code& error)
36 {
37 connectionInProgress_ = false;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080038 connectTimer_.cancel();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080039
40 if (!error)
41 {
42 partialDataSize_ = 0;
43 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -080044 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080045
46 transport_.isConnected_ = true;
47
48 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
49 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -080050 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080051
52 sendQueue_.clear();
53 }
54 else
55 {
56 // may need to throw exception
57 transport_.isConnected_ = false;
Alexander Afanasyev8995f542014-01-17 15:33:44 -080058 transport_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080059 throw Transport::Error(error, "error while connecting to the forwarder");
60 }
61 }
Alexander Afanasyev0102be92014-01-09 14:31:09 -080062
63 void
64 connectTimeoutHandler(const boost::system::error_code& error)
65 {
66 if (error) // e.g., cancelled timer
67 return;
68
69 connectionInProgress_ = false;
70 transport_.isConnected_ = false;
71 socket_.close();
72 throw Transport::Error(error, "error while connecting to the forwarder");
73 }
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -080074
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080075 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080076 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080077 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080078 if (!connectionInProgress_) {
79 connectionInProgress_ = true;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080080
81 // Wait at most 4 seconds to connect
82 /// @todo Decide whether this number should be configurable
83 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
84 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -080085
Alexander Afanasyev8995f542014-01-17 15:33:44 -080086 socket_.open();
87 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
88 func_lib::bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080089 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080090 }
91
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -080092 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080093 close()
94 {
Alexander Afanasyev0102be92014-01-09 14:31:09 -080095 connectTimer_.cancel();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080096 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080097 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080098 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080099
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800100 void
101 send(const Block& wire)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800102 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800103 if (!transport_.isConnected_)
104 sendQueue_.push_back(wire);
105 else
106 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800107 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800108 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800109
110 inline void
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800111 processAll(uint8_t* buffer, size_t& offset, size_t availableSize)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800112 {
113 while(offset < availableSize)
114 {
115 Block element(buffer + offset, availableSize - offset);
116 transport_.receive(element);
117
118 offset += element.size();
119 }
120 }
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800121
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800122 void
123 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
124 {
125 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800126
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800127 if (error)
128 {
Alexander Afanasyevbf082112014-01-09 14:27:55 -0800129 if (error == boost::system::errc::operation_canceled) {
130 // async receive has been explicitly cancelled (e.g., socket close)
131 return;
132 }
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800133
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800134 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800135 transport_.isConnected_ = true;
136 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800137 }
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800138
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800139 if (!error && bytes_recvd > 0)
140 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800141 // inputBuffer_ has bytes_recvd received bytes of data
142 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800143 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800144 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800145 std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
146
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800147 partialDataSize_ += newDataSize;
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800148
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800149 size_t offset = 0;
150 try
151 {
152 processAll(partialData_, offset, partialDataSize_);
153
154 // no exceptions => processed the whole thing
155 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800156 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800157 // there is a little bit more data available
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800158
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800159 offset = 0;
160 partialDataSize_ = bytes_recvd - newDataSize;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800161 std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800162
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800163 processAll(partialData_, offset, partialDataSize_);
164
165 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800166 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800167 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800168 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800169 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800170 // done processing
171 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800172 }
173 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800174 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800175 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800176 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800177 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800178 partialDataSize_ -= offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800179 std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800180 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800181 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800182 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800183 // very bad... should close connection
184 socket_.close();
185 transport_.isConnected_ = true;
186 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800187 }
188 }
189 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800190 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800191 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800192 size_t offset = 0;
193 try
194 {
195 processAll(inputBuffer_, offset, bytes_recvd);
196 }
197 catch(Tlv::Error &error)
198 {
199 if (offset > 0)
200 {
201 partialDataSize_ = bytes_recvd - offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800202 std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800203 }
204 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800205 }
206 }
207
208 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800209 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800210 }
211
212 void
213 handle_async_send(const boost::system::error_code& error, const Block &wire)
214 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800215 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800216 }
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800217
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800218private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800219 UnixTransport &transport_;
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800220
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800221 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800222 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800223
224 uint8_t partialData_[MAX_LENGTH];
225 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800226
227 std::list< Block > sendQueue_;
228 bool connectionInProgress_;
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800229
230 boost::asio::deadline_timer connectTimer_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800231};
232
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800233
234UnixTransport::UnixTransport()
235{
236 if (std::getenv("NFD") != 0)
237 unixSocket_ = "/var/run/nfd.sock";
238 else
239 unixSocket_ = "/tmp/.ndnd.sock";
240}
241
242UnixTransport::UnixTransport(const std::string& unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800243 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800244{
245}
246
247UnixTransport::~UnixTransport()
248{
249}
250
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800251void
252UnixTransport::connect(boost::asio::io_service& ioService,
253 const ReceiveCallback& receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800254{
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800255 if (!static_cast<bool>(impl_)) {
256 Transport::connect(ioService, receiveCallback);
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800257
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800258 impl_ = ptr_lib::make_shared<UnixTransport::Impl> (ptr_lib::ref(*this));
259 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800260 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800261}
262
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800263void
264UnixTransport::send(const Block& wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800265{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800266 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800267}
268
Alexander Afanasyevf7ca3202014-02-14 22:28:31 -0800269void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800270UnixTransport::close()
271{
272 impl_->close();
273}
274
275}