blob: 4da05026cceedd2753e412c601619fd0768148d4 [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -08008#include "common.hpp"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08009
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -080010#include "unix-transport.hpp"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080011
Alexander Afanasyeve2dcdfd2014-02-07 15:53:28 -080012#include "../face.hpp"
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080013
14using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080015typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080016
17namespace ndn {
18
19const size_t MAX_LENGTH = 9000;
20
21class UnixTransport::Impl
22{
23public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080024 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080025 : transport_(transport)
26 , socket_(*transport_.ioService_)
27 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080028 , connectionInProgress_(false)
Alexander Afanasyev0102be92014-01-09 14:31:09 -080029 , connectTimer_(*transport_.ioService_)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080030 {
31 }
32
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080033 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080034 connectHandler(const boost::system::error_code& error)
35 {
36 connectionInProgress_ = false;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080037 connectTimer_.cancel();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080038
39 if (!error)
40 {
41 partialDataSize_ = 0;
42 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -080043 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080044
45 transport_.isConnected_ = true;
46
47 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
48 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -080049 func_lib::bind(&Impl::handle_async_send, this, _1, *i));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080050
51 sendQueue_.clear();
52 }
53 else
54 {
55 // may need to throw exception
56 transport_.isConnected_ = false;
Alexander Afanasyev8995f542014-01-17 15:33:44 -080057 transport_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080058 throw Transport::Error(error, "error while connecting to the forwarder");
59 }
60 }
Alexander Afanasyev0102be92014-01-09 14:31:09 -080061
62 void
63 connectTimeoutHandler(const boost::system::error_code& error)
64 {
65 if (error) // e.g., cancelled timer
66 return;
67
68 connectionInProgress_ = false;
69 transport_.isConnected_ = false;
70 socket_.close();
71 throw Transport::Error(error, "error while connecting to the forwarder");
72 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080073
74 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080075 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080076 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080077 if (!connectionInProgress_) {
78 connectionInProgress_ = true;
Alexander Afanasyev0102be92014-01-09 14:31:09 -080079
80 // Wait at most 4 seconds to connect
81 /// @todo Decide whether this number should be configurable
82 connectTimer_.expires_from_now(boost::posix_time::seconds(4));
83 connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
Alexander Afanasyev8995f542014-01-17 15:33:44 -080084
85 socket_.open();
86 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
87 func_lib::bind(&Impl::connectHandler, this, _1));
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080088 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080089 }
90
91 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080092 close()
93 {
Alexander Afanasyev0102be92014-01-09 14:31:09 -080094 connectTimer_.cancel();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080095 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080096 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080097 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080098
99 void
100 send(const Block &wire)
101 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800102 if (!transport_.isConnected_)
103 sendQueue_.push_back(wire);
104 else
105 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800106 func_lib::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800107 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800108
109 inline void
110 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
111 {
112 while(offset < availableSize)
113 {
114 Block element(buffer + offset, availableSize - offset);
115 transport_.receive(element);
116
117 offset += element.size();
118 }
119 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800120
121 void
122 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
123 {
124 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800125
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800126 if (error)
127 {
Alexander Afanasyevbf082112014-01-09 14:27:55 -0800128 if (error == boost::system::errc::operation_canceled) {
129 // async receive has been explicitly cancelled (e.g., socket close)
130 return;
131 }
132
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800133 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800134 transport_.isConnected_ = true;
135 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800136 }
137
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800138 if (!error && bytes_recvd > 0)
139 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800140 // inputBuffer_ has bytes_recvd received bytes of data
141 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800142 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800143 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800144 std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
145
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800146 partialDataSize_ += newDataSize;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800147
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800148 size_t offset = 0;
149 try
150 {
151 processAll(partialData_, offset, partialDataSize_);
152
153 // no exceptions => processed the whole thing
154 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800155 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800156 // there is a little bit more data available
157
158 offset = 0;
159 partialDataSize_ = bytes_recvd - newDataSize;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800160 std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800161
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800162 processAll(partialData_, offset, partialDataSize_);
163
164 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800165 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800166 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800167 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800168 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800169 // done processing
170 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800171 }
172 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800173 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800174 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800175 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800176 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800177 partialDataSize_ -= offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800178 std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800179 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800180 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800181 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800182 // very bad... should close connection
183 socket_.close();
184 transport_.isConnected_ = true;
185 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800186 }
187 }
188 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800189 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800190 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800191 size_t offset = 0;
192 try
193 {
194 processAll(inputBuffer_, offset, bytes_recvd);
195 }
196 catch(Tlv::Error &error)
197 {
198 if (offset > 0)
199 {
200 partialDataSize_ = bytes_recvd - offset;
Alexander Afanasyevd409d592014-01-28 18:36:38 -0800201 std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800202 }
203 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800204 }
205 }
206
207 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
Alexander Afanasyev3c034202014-01-06 00:06:48 -0800208 func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800209 }
210
211 void
212 handle_async_send(const boost::system::error_code& error, const Block &wire)
213 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800214 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800215 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800216
217private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800218 UnixTransport &transport_;
219
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800220 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800221 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800222
223 uint8_t partialData_[MAX_LENGTH];
224 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800225
226 std::list< Block > sendQueue_;
227 bool connectionInProgress_;
Alexander Afanasyev0102be92014-01-09 14:31:09 -0800228
229 boost::asio::deadline_timer connectTimer_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800230};
231
232UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
233 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800234{
235}
236
237UnixTransport::~UnixTransport()
238{
239}
240
241void
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800242UnixTransport::connect(boost::asio::io_service &ioService,
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800243 const ReceiveCallback &receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800244{
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800245 if (!static_cast<bool>(impl_)) {
246 Transport::connect(ioService, receiveCallback);
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800247
Alexander Afanasyev50ca6272014-01-09 23:23:54 -0800248 impl_ = ptr_lib::make_shared<UnixTransport::Impl> (ptr_lib::ref(*this));
249 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800250 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800251}
252
253void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800254UnixTransport::send(const Block &wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800255{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800256 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800257}
258
259void
260UnixTransport::close()
261{
262 impl_->close();
263}
264
265}