node+transport: Improving async operations and error reporting
All operations, including connect are fully async
Change-Id: I6c28c4f883d3aa10efbc4af939cf09e0e865c574
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 4bf85d7..81a46cc 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -29,32 +29,63 @@
: transport_(transport)
, socket_(*transport_.ioService_)
, partialDataSize_(0)
+ , connectionInProgress_(false)
{
}
void
+ connectHandler(const boost::system::error_code& error)
+ {
+ connectionInProgress_ = false;
+
+ if (!error)
+ {
+ partialDataSize_ = 0;
+ socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
+ boost::bind(&Impl::handle_async_receive, this, _1, _2));
+
+ transport_.isConnected_ = true;
+
+ for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
+ socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
+ boost::bind(&Impl::handle_async_send, this, _1, *i));
+
+ sendQueue_.clear();
+ }
+ else
+ {
+ // may need to throw exception
+ transport_.isConnected_ = false;
+ throw Transport::Error(error, "error while connecting to the forwarder");
+ }
+ }
+
+ void
connect()
{
- socket_.open();
- socket_.connect(protocol::endpoint(transport_.unixSocket_));
- // socket_.async_connect(protocol::endpoint(unixSocket));
-
- partialDataSize_ = 0;
- socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
- boost::bind(&Impl::handle_async_receive, this, _1, _2));
+ if (!connectionInProgress_) {
+ connectionInProgress_ = true;
+ socket_.open();
+ socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
+ func_lib::bind(&Impl::connectHandler, this, _1));
+ }
}
void
close()
{
socket_.close();
+ transport_.isConnected_ = false;
}
void
send(const Block &wire)
{
- socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
- boost::bind(&Impl::handle_async_send, this, _1, wire));
+ if (!transport_.isConnected_)
+ sendQueue_.push_back(wire);
+ else
+ socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
+ boost::bind(&Impl::handle_async_send, this, _1, wire));
}
inline void
@@ -77,81 +108,75 @@
if (error)
{
socket_.close(); // closing at this point may not be that necessary
- transport_.errorCallback_();
+ transport_.isConnected_ = true;
+ throw Transport::Error(error, "error while receiving data from socket");
}
if (!error && bytes_recvd > 0)
{
- try
+ // inputBuffer_ has bytes_recvd received bytes of data
+ if (partialDataSize_ > 0)
{
- // inputBuffer_ has bytes_recvd received bytes of data
- if (partialDataSize_ > 0)
- {
- size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
- ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
- partialDataSize_ += newDataSize;
+ size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
+ ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
+ partialDataSize_ += newDataSize;
- size_t offset = 0;
- try
+ size_t offset = 0;
+ try
+ {
+ processAll(partialData_, offset, partialDataSize_);
+
+ // no exceptions => processed the whole thing
+ if (bytes_recvd - newDataSize > 0)
{
+ // there is a little bit more data available
+
+ offset = 0;
+ partialDataSize_ = bytes_recvd - newDataSize;
+ ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
+
processAll(partialData_, offset, partialDataSize_);
// no exceptions => processed the whole thing
- if (bytes_recvd - newDataSize > 0)
- {
- // there is a little bit more data available
-
- offset = 0;
- partialDataSize_ = bytes_recvd - newDataSize;
- ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
-
- processAll(partialData_, offset, partialDataSize_);
-
- // no exceptions => processed the whole thing
- partialDataSize_ = 0;
- }
- else
- {
- // done processing
- partialDataSize_ = 0;
- }
+ partialDataSize_ = 0;
}
- catch(Block::Error &)
+ else
{
- if (offset > 0)
- {
- partialDataSize_ -= offset;
- ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
- }
- else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
- {
- // very bad... should close connection
- socket_.close();
- transport_.errorCallback_();
- }
+ // done processing
+ partialDataSize_ = 0;
}
}
- else
+ catch(Tlv::Error &)
{
- size_t offset = 0;
- try
+ if (offset > 0)
{
- processAll(inputBuffer_, offset, bytes_recvd);
+ partialDataSize_ -= offset;
+ ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
}
- catch(Block::Error &error)
+ else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
{
- if (offset > 0)
- {
- partialDataSize_ = bytes_recvd - offset;
- ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
- }
+ // very bad... should close connection
+ socket_.close();
+ transport_.isConnected_ = true;
+ throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
}
}
}
- catch(Tlv::Error &error)
+ else
{
- std::cerr << "[[handle_async_receive]] Tlv::Error: " << error.what() << std::endl;
- // pass
+ size_t offset = 0;
+ try
+ {
+ processAll(inputBuffer_, offset, bytes_recvd);
+ }
+ catch(Tlv::Error &error)
+ {
+ if (offset > 0)
+ {
+ partialDataSize_ = bytes_recvd - offset;
+ ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
+ }
+ }
}
}
@@ -173,6 +198,9 @@
uint8_t partialData_[MAX_LENGTH];
size_t partialDataSize_;
+
+ std::list< Block > sendQueue_;
+ bool connectionInProgress_;
};
UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
@@ -186,15 +214,12 @@
void
UnixTransport::connect(boost::asio::io_service &ioService,
- const ReceiveCallback &receiveCallback,
- const ErrorCallback &errorCallback)
+ const ReceiveCallback &receiveCallback)
{
- Transport::connect(ioService, receiveCallback, errorCallback);
+ Transport::connect(ioService, receiveCallback);
impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
impl_->connect();
-
- isConnected_ = true;
}
void