node+transport: Improving async operations and error reporting
All operations, including connect are fully async
Change-Id: I6c28c4f883d3aa10efbc4af939cf09e0e865c574
diff --git a/include/ndn-cpp/node.hpp b/include/ndn-cpp/node.hpp
index ef1d40e..4092330 100644
--- a/include/ndn-cpp/node.hpp
+++ b/include/ndn-cpp/node.hpp
@@ -134,6 +134,11 @@
void
onTransportError();
+ struct ProcessEventsTimeout {};
+
+ static void
+ fireProcessEventsTimeout(const boost::system::error_code& error);
+
private:
class PendingInterest {
public:
diff --git a/include/ndn-cpp/transport/transport.hpp b/include/ndn-cpp/transport/transport.hpp
index 59af6d1..064107b 100644
--- a/include/ndn-cpp/transport/transport.hpp
+++ b/include/ndn-cpp/transport/transport.hpp
@@ -12,11 +12,14 @@
#include <vector>
#include <boost/asio.hpp>
+#include <boost/lexical_cast.hpp>
namespace ndn {
class Transport {
public:
+ struct Error : public std::runtime_error { inline Error(const boost::system::error_code &code, const std::string &msg); };
+
typedef ptr_lib::function<void (const Block &wire)> ReceiveCallback;
typedef ptr_lib::function<void ()> ErrorCallback;
@@ -33,8 +36,7 @@
*/
inline virtual void
connect(boost::asio::io_service &io_service,
- const ReceiveCallback &receiveCallback,
- const ErrorCallback &errorCallback);
+ const ReceiveCallback &receiveCallback);
/**
* Close the connection.
@@ -61,7 +63,6 @@
boost::asio::io_service *ioService_;
bool isConnected_;
ReceiveCallback receiveCallback_;
- ErrorCallback errorCallback_;
};
inline
@@ -71,6 +72,11 @@
{
}
+inline Transport::Error::Error(const boost::system::error_code &code, const std::string &msg)
+ : std::runtime_error(msg + (code.value() ? " (" + code.category().message(code.value()) + ")" : ""))
+{
+}
+
inline
Transport::~Transport()
{
@@ -78,12 +84,10 @@
inline void
Transport::connect(boost::asio::io_service &ioService,
- const ReceiveCallback &receiveCallback,
- const ErrorCallback &errorCallback)
+ const ReceiveCallback &receiveCallback)
{
ioService_ = &ioService;
receiveCallback_ = receiveCallback;
- errorCallback_ = errorCallback;
}
inline bool
diff --git a/include/ndn-cpp/transport/unix-transport.hpp b/include/ndn-cpp/transport/unix-transport.hpp
index 9692cff..b7b65a9 100644
--- a/include/ndn-cpp/transport/unix-transport.hpp
+++ b/include/ndn-cpp/transport/unix-transport.hpp
@@ -22,8 +22,7 @@
// from Transport
virtual void
connect(boost::asio::io_service &ioService,
- const ReceiveCallback &receiveCallback,
- const ErrorCallback &errorCallback);
+ const ReceiveCallback &receiveCallback);
virtual void
close();
diff --git a/src/node.cpp b/src/node.cpp
index cec382c..5449a88 100644
--- a/src/node.cpp
+++ b/src/node.cpp
@@ -39,8 +39,7 @@
// TODO: Properly check if we are already connected to the expected host.
if (!transport_->isConnected())
transport_->connect(ioService_,
- ptr_lib::bind(&Node::onReceiveElement, this, _1),
- ptr_lib::bind(&Node::onTransportError, this));
+ ptr_lib::bind(&Node::onReceiveElement, this, _1));
uint64_t pendingInterestId = PendingInterest::getNextPendingInterestId();
pendingInterestTable_.push_back(ptr_lib::shared_ptr<PendingInterest>(new PendingInterest
@@ -57,8 +56,7 @@
// TODO: Properly check if we are already connected to the expected host.
if (!transport_->isConnected())
transport_->connect(ioService_,
- ptr_lib::bind(&Node::onReceiveElement, this, _1),
- ptr_lib::bind(&Node::onTransportError, this));
+ ptr_lib::bind(&Node::onReceiveElement, this, _1));
transport_->send(data.wireEncode());
}
@@ -212,21 +210,27 @@
if (timeout > 0)
{
processEventsTimeoutTimer_.expires_from_now(boost::posix_time::milliseconds(timeout));
- processEventsTimeoutTimer_.async_wait(func_lib::bind(&Node::shutdown, this));
+ processEventsTimeoutTimer_.async_wait(fireProcessEventsTimeout);
}
try
{
ioService_.run();
ioService_.reset();
}
- catch(Node::Error &)
+ catch(Node::ProcessEventsTimeout &)
{
- ioService_.reset(); // this needed in order to call ioService_.run() again in the future
- throw;
+ // break
}
}
void
+Node::fireProcessEventsTimeout(const boost::system::error_code& error)
+{
+ if (!error) // can fire for some other reason, e.g., cancelled
+ throw Node::ProcessEventsTimeout();
+}
+
+void
Node::checkPitExpire()
{
// Check for PIT entry timeouts. Go backwards through the list so we can erase entries.
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 4bf85d7..81a46cc 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -29,32 +29,63 @@
: transport_(transport)
, socket_(*transport_.ioService_)
, partialDataSize_(0)
+ , connectionInProgress_(false)
{
}
void
+ connectHandler(const boost::system::error_code& error)
+ {
+ connectionInProgress_ = false;
+
+ if (!error)
+ {
+ partialDataSize_ = 0;
+ socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
+ boost::bind(&Impl::handle_async_receive, this, _1, _2));
+
+ transport_.isConnected_ = true;
+
+ for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
+ socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
+ boost::bind(&Impl::handle_async_send, this, _1, *i));
+
+ sendQueue_.clear();
+ }
+ else
+ {
+ // may need to throw exception
+ transport_.isConnected_ = false;
+ throw Transport::Error(error, "error while connecting to the forwarder");
+ }
+ }
+
+ void
connect()
{
- socket_.open();
- socket_.connect(protocol::endpoint(transport_.unixSocket_));
- // socket_.async_connect(protocol::endpoint(unixSocket));
-
- partialDataSize_ = 0;
- socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
- boost::bind(&Impl::handle_async_receive, this, _1, _2));
+ if (!connectionInProgress_) {
+ connectionInProgress_ = true;
+ socket_.open();
+ socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
+ func_lib::bind(&Impl::connectHandler, this, _1));
+ }
}
void
close()
{
socket_.close();
+ transport_.isConnected_ = false;
}
void
send(const Block &wire)
{
- socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
- boost::bind(&Impl::handle_async_send, this, _1, wire));
+ if (!transport_.isConnected_)
+ sendQueue_.push_back(wire);
+ else
+ socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
+ boost::bind(&Impl::handle_async_send, this, _1, wire));
}
inline void
@@ -77,81 +108,75 @@
if (error)
{
socket_.close(); // closing at this point may not be that necessary
- transport_.errorCallback_();
+ transport_.isConnected_ = true;
+ throw Transport::Error(error, "error while receiving data from socket");
}
if (!error && bytes_recvd > 0)
{
- try
+ // inputBuffer_ has bytes_recvd received bytes of data
+ if (partialDataSize_ > 0)
{
- // inputBuffer_ has bytes_recvd received bytes of data
- if (partialDataSize_ > 0)
- {
- size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
- ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
- partialDataSize_ += newDataSize;
+ size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
+ ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
+ partialDataSize_ += newDataSize;
- size_t offset = 0;
- try
+ size_t offset = 0;
+ try
+ {
+ processAll(partialData_, offset, partialDataSize_);
+
+ // no exceptions => processed the whole thing
+ if (bytes_recvd - newDataSize > 0)
{
+ // there is a little bit more data available
+
+ offset = 0;
+ partialDataSize_ = bytes_recvd - newDataSize;
+ ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
+
processAll(partialData_, offset, partialDataSize_);
// no exceptions => processed the whole thing
- if (bytes_recvd - newDataSize > 0)
- {
- // there is a little bit more data available
-
- offset = 0;
- partialDataSize_ = bytes_recvd - newDataSize;
- ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
-
- processAll(partialData_, offset, partialDataSize_);
-
- // no exceptions => processed the whole thing
- partialDataSize_ = 0;
- }
- else
- {
- // done processing
- partialDataSize_ = 0;
- }
+ partialDataSize_ = 0;
}
- catch(Block::Error &)
+ else
{
- if (offset > 0)
- {
- partialDataSize_ -= offset;
- ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
- }
- else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
- {
- // very bad... should close connection
- socket_.close();
- transport_.errorCallback_();
- }
+ // done processing
+ partialDataSize_ = 0;
}
}
- else
+ catch(Tlv::Error &)
{
- size_t offset = 0;
- try
+ if (offset > 0)
{
- processAll(inputBuffer_, offset, bytes_recvd);
+ partialDataSize_ -= offset;
+ ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
}
- catch(Block::Error &error)
+ else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
{
- if (offset > 0)
- {
- partialDataSize_ = bytes_recvd - offset;
- ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
- }
+ // very bad... should close connection
+ socket_.close();
+ transport_.isConnected_ = true;
+ throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
}
}
}
- catch(Tlv::Error &error)
+ else
{
- std::cerr << "[[handle_async_receive]] Tlv::Error: " << error.what() << std::endl;
- // pass
+ size_t offset = 0;
+ try
+ {
+ processAll(inputBuffer_, offset, bytes_recvd);
+ }
+ catch(Tlv::Error &error)
+ {
+ if (offset > 0)
+ {
+ partialDataSize_ = bytes_recvd - offset;
+ ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
+ }
+ }
}
}
@@ -173,6 +198,9 @@
uint8_t partialData_[MAX_LENGTH];
size_t partialDataSize_;
+
+ std::list< Block > sendQueue_;
+ bool connectionInProgress_;
};
UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/)
@@ -186,15 +214,12 @@
void
UnixTransport::connect(boost::asio::io_service &ioService,
- const ReceiveCallback &receiveCallback,
- const ErrorCallback &errorCallback)
+ const ReceiveCallback &receiveCallback)
{
- Transport::connect(ioService, receiveCallback, errorCallback);
+ Transport::connect(ioService, receiveCallback);
impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
impl_->connect();
-
- isConnected_ = true;
}
void