face: Extending Face interface to support ``put`` call
Also, this commit includes Face/Node/Transport enhancement in regard to
the error processing: io_service thread will be stopped and Node::Error
exception will be thrown)
Change-Id: Ibec2f75e7b7d2e9a221a857fdc63295dd7da16e0
diff --git a/src/node.cpp b/src/node.cpp
index 3610344..cec382c 100644
--- a/src/node.cpp
+++ b/src/node.cpp
@@ -25,6 +25,7 @@
Node::Node(const ptr_lib::shared_ptr<Transport>& transport)
: timer_ (ioService_)
+ , processEventsTimeoutTimer_(ioService_)
, transport_(transport)
, ndndIdFetcherInterest_(Name("/%C1.M.S.localhost/%C1.M.SRV/ndnd/KEY"), 4000.0)
{
@@ -37,7 +38,9 @@
{
// TODO: Properly check if we are already connected to the expected host.
if (!transport_->isConnected())
- transport_->connect(ioService_, ptr_lib::bind(&Node::onReceiveElement, this, _1));
+ transport_->connect(ioService_,
+ ptr_lib::bind(&Node::onReceiveElement, this, _1),
+ ptr_lib::bind(&Node::onTransportError, this));
uint64_t pendingInterestId = PendingInterest::getNextPendingInterestId();
pendingInterestTable_.push_back(ptr_lib::shared_ptr<PendingInterest>(new PendingInterest
@@ -49,6 +52,19 @@
}
void
+Node::put(const Data &data)
+{
+ // TODO: Properly check if we are already connected to the expected host.
+ if (!transport_->isConnected())
+ transport_->connect(ioService_,
+ ptr_lib::bind(&Node::onReceiveElement, this, _1),
+ ptr_lib::bind(&Node::onTransportError, this));
+
+ transport_->send(data.wireEncode());
+}
+
+
+void
Node::removePendingInterest(uint64_t pendingInterestId)
{
// Go backwards through the list so we can erase entries.
@@ -191,12 +207,23 @@
}
void
-Node::processEvents()
+Node::processEvents(Milliseconds timeout/* = 0 */)
{
- ioService_.run();
-
- // auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(ioService_));
- // work.reset(); // Allow run() to exit.
+ if (timeout > 0)
+ {
+ processEventsTimeoutTimer_.expires_from_now(boost::posix_time::milliseconds(timeout));
+ processEventsTimeoutTimer_.async_wait(func_lib::bind(&Node::shutdown, this));
+ }
+ try
+ {
+ ioService_.run();
+ ioService_.reset();
+ }
+ catch(Node::Error &)
+ {
+ ioService_.reset(); // this needed in order to call ioService_.run() again in the future
+ throw;
+ }
}
void
@@ -250,6 +277,15 @@
}
}
+void
+Node::onTransportError()
+{
+ /// @todo Set some error code
+
+ ioService_.stop();
+ throw Error("TransportError");
+}
+
void
Node::shutdown()
{
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 3e01943..4bf85d7 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -74,6 +74,12 @@
{
/// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
+ if (error)
+ {
+ socket_.close(); // closing at this point may not be that necessary
+ transport_.errorCallback_();
+ }
+
if (!error && bytes_recvd > 0)
{
try
@@ -120,8 +126,8 @@
else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
{
// very bad... should close connection
- /// @todo Notify somebody
socket_.close();
+ transport_.errorCallback_();
}
}
}
@@ -179,9 +185,11 @@
}
void
-UnixTransport::connect(boost::asio::io_service &ioService, const ReceiveCallback &receiveCallback)
+UnixTransport::connect(boost::asio::io_service &ioService,
+ const ReceiveCallback &receiveCallback,
+ const ErrorCallback &errorCallback)
{
- Transport::connect(ioService, receiveCallback);
+ Transport::connect(ioService, receiveCallback, errorCallback);
impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
impl_->connect();