blob: 81a46ccad0ec92554f13028baa507c9f5d6b434c [file] [log] [blame]
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/**
3 * Copyright (C) 2013 Regents of the University of California.
4 * @author: Jeff Thompson <jefft0@remap.ucla.edu>
5 * See COPYING for copyright and distribution information.
6 */
7
8#include <stdexcept>
9#include <stdlib.h>
10
11#include <ndn-cpp/face.hpp>
12#include <ndn-cpp/transport/unix-transport.hpp>
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080013#include <ndn-cpp/c/util/ndn_memory.h>
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080014
15#include <boost/asio.hpp>
16#include <boost/bind.hpp>
17
18using namespace std;
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080019typedef boost::asio::local::stream_protocol protocol;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080020
21namespace ndn {
22
23const size_t MAX_LENGTH = 9000;
24
25class UnixTransport::Impl
26{
27public:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080028 Impl(UnixTransport &transport)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080029 : transport_(transport)
30 , socket_(*transport_.ioService_)
31 , partialDataSize_(0)
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080032 , connectionInProgress_(false)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080033 {
34 }
35
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080036 void
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080037 connectHandler(const boost::system::error_code& error)
38 {
39 connectionInProgress_ = false;
40
41 if (!error)
42 {
43 partialDataSize_ = 0;
44 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
45 boost::bind(&Impl::handle_async_receive, this, _1, _2));
46
47 transport_.isConnected_ = true;
48
49 for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
50 socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
51 boost::bind(&Impl::handle_async_send, this, _1, *i));
52
53 sendQueue_.clear();
54 }
55 else
56 {
57 // may need to throw exception
58 transport_.isConnected_ = false;
59 throw Transport::Error(error, "error while connecting to the forwarder");
60 }
61 }
62
63 void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080064 connect()
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080065 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080066 if (!connectionInProgress_) {
67 connectionInProgress_ = true;
68 socket_.open();
69 socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
70 func_lib::bind(&Impl::connectHandler, this, _1));
71 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080072 }
73
74 void
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080075 close()
76 {
77 socket_.close();
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080078 transport_.isConnected_ = false;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -080079 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080080
81 void
82 send(const Block &wire)
83 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -080084 if (!transport_.isConnected_)
85 sendQueue_.push_back(wire);
86 else
87 socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
88 boost::bind(&Impl::handle_async_send, this, _1, wire));
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -080089 }
Alexander Afanasyev328e23d2013-12-28 20:47:38 -080090
91 inline void
92 processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
93 {
94 while(offset < availableSize)
95 {
96 Block element(buffer + offset, availableSize - offset);
97 transport_.receive(element);
98
99 offset += element.size();
100 }
101 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800102
103 void
104 handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
105 {
106 /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800107
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800108 if (error)
109 {
110 socket_.close(); // closing at this point may not be that necessary
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800111 transport_.isConnected_ = true;
112 throw Transport::Error(error, "error while receiving data from socket");
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800113 }
114
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800115 if (!error && bytes_recvd > 0)
116 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800117 // inputBuffer_ has bytes_recvd received bytes of data
118 if (partialDataSize_ > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800119 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800120 size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
121 ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
122 partialDataSize_ += newDataSize;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800123
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800124 size_t offset = 0;
125 try
126 {
127 processAll(partialData_, offset, partialDataSize_);
128
129 // no exceptions => processed the whole thing
130 if (bytes_recvd - newDataSize > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800131 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800132 // there is a little bit more data available
133
134 offset = 0;
135 partialDataSize_ = bytes_recvd - newDataSize;
136 ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
137
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800138 processAll(partialData_, offset, partialDataSize_);
139
140 // no exceptions => processed the whole thing
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800141 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800142 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800143 else
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800144 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800145 // done processing
146 partialDataSize_ = 0;
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800147 }
148 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800149 catch(Tlv::Error &)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800150 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800151 if (offset > 0)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800152 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800153 partialDataSize_ -= offset;
154 ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800155 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800156 else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800157 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800158 // very bad... should close connection
159 socket_.close();
160 transport_.isConnected_ = true;
161 throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800162 }
163 }
164 }
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800165 else
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800166 {
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800167 size_t offset = 0;
168 try
169 {
170 processAll(inputBuffer_, offset, bytes_recvd);
171 }
172 catch(Tlv::Error &error)
173 {
174 if (offset > 0)
175 {
176 partialDataSize_ = bytes_recvd - offset;
177 ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
178 }
179 }
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800180 }
181 }
182
183 socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
184 boost::bind(&Impl::handle_async_receive, this, _1, _2));
185 }
186
187 void
188 handle_async_send(const boost::system::error_code& error, const Block &wire)
189 {
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800190 // pass (needed to keep data block alive during the send)
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800191 }
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800192
193private:
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800194 UnixTransport &transport_;
195
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800196 protocol::socket socket_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800197 uint8_t inputBuffer_[MAX_LENGTH];
Alexander Afanasyev328e23d2013-12-28 20:47:38 -0800198
199 uint8_t partialData_[MAX_LENGTH];
200 size_t partialDataSize_;
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800201
202 std::list< Block > sendQueue_;
203 bool connectionInProgress_;
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800204};
205
206UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
207 : unixSocket_(unixSocket)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800208{
209}
210
211UnixTransport::~UnixTransport()
212{
213}
214
215void
Alexander Afanasyeva557d5a2013-12-28 21:59:03 -0800216UnixTransport::connect(boost::asio::io_service &ioService,
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800217 const ReceiveCallback &receiveCallback)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800218{
Alexander Afanasyev3ae2da22013-12-29 15:50:04 -0800219 Transport::connect(ioService, receiveCallback);
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800220
221 impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
222 impl_->connect();
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800223}
224
225void
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800226UnixTransport::send(const Block &wire)
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800227{
Alexander Afanasyeve2e0d752014-01-03 13:30:30 -0800228 impl_->send(wire);
Alexander Afanasyevfe3b1502013-12-18 16:45:03 -0800229}
230
231void
232UnixTransport::close()
233{
234 impl_->close();
235}
236
237}