node+transport: Improving async operations and error reporting

All operations, including connect are fully async

Change-Id: I6c28c4f883d3aa10efbc4af939cf09e0e865c574
diff --git a/include/ndn-cpp/node.hpp b/include/ndn-cpp/node.hpp
index ef1d40e..4092330 100644
--- a/include/ndn-cpp/node.hpp
+++ b/include/ndn-cpp/node.hpp
@@ -134,6 +134,11 @@
   void
   onTransportError();
 
+  struct ProcessEventsTimeout {};
+  
+  static void
+  fireProcessEventsTimeout(const boost::system::error_code& error);
+
 private:
   class PendingInterest {
   public:
diff --git a/include/ndn-cpp/transport/transport.hpp b/include/ndn-cpp/transport/transport.hpp
index 59af6d1..064107b 100644
--- a/include/ndn-cpp/transport/transport.hpp
+++ b/include/ndn-cpp/transport/transport.hpp
@@ -12,11 +12,14 @@
 
 #include <vector>
 #include <boost/asio.hpp>
+#include <boost/lexical_cast.hpp>
 
 namespace ndn {
 
 class Transport {
 public:
+  struct Error : public std::runtime_error { inline Error(const boost::system::error_code &code, const std::string &msg); };
+  
   typedef ptr_lib::function<void (const Block &wire)> ReceiveCallback;
   typedef ptr_lib::function<void ()> ErrorCallback;
   
@@ -33,8 +36,7 @@
    */
   inline virtual void 
   connect(boost::asio::io_service &io_service,
-          const ReceiveCallback &receiveCallback,
-          const ErrorCallback &errorCallback);
+          const ReceiveCallback &receiveCallback);
   
   /**
    * Close the connection.
@@ -61,7 +63,6 @@
   boost::asio::io_service *ioService_;
   bool isConnected_;
   ReceiveCallback receiveCallback_;
-  ErrorCallback errorCallback_;
 };
 
 inline
@@ -71,6 +72,11 @@
 {
 }
 
+inline Transport::Error::Error(const boost::system::error_code &code, const std::string &msg)
+  : std::runtime_error(msg + (code.value() ? " (" + code.category().message(code.value()) + ")" : ""))
+{
+}
+
 inline
 Transport::~Transport()
 {
@@ -78,12 +84,10 @@
 
 inline void 
 Transport::connect(boost::asio::io_service &ioService,
-                   const ReceiveCallback &receiveCallback,
-                   const ErrorCallback &errorCallback)
+                   const ReceiveCallback &receiveCallback)
 {
   ioService_ = &ioService;
   receiveCallback_ = receiveCallback;
-  errorCallback_ = errorCallback;
 }
 
 inline bool 
diff --git a/include/ndn-cpp/transport/unix-transport.hpp b/include/ndn-cpp/transport/unix-transport.hpp
index 9692cff..b7b65a9 100644
--- a/include/ndn-cpp/transport/unix-transport.hpp
+++ b/include/ndn-cpp/transport/unix-transport.hpp
@@ -22,8 +22,7 @@
   // from Transport
   virtual void 
   connect(boost::asio::io_service &ioService,
-          const ReceiveCallback &receiveCallback,
-          const ErrorCallback &errorCallback);
+          const ReceiveCallback &receiveCallback);
   
   virtual void 
   close();
diff --git a/src/node.cpp b/src/node.cpp
index cec382c..5449a88 100644
--- a/src/node.cpp
+++ b/src/node.cpp
@@ -39,8 +39,7 @@
   // TODO: Properly check if we are already connected to the expected host.
   if (!transport_->isConnected())
     transport_->connect(ioService_,
-                        ptr_lib::bind(&Node::onReceiveElement, this, _1),
-                        ptr_lib::bind(&Node::onTransportError, this));
+                        ptr_lib::bind(&Node::onReceiveElement, this, _1));
   
   uint64_t pendingInterestId = PendingInterest::getNextPendingInterestId();
   pendingInterestTable_.push_back(ptr_lib::shared_ptr<PendingInterest>(new PendingInterest
@@ -57,8 +56,7 @@
   // TODO: Properly check if we are already connected to the expected host.
   if (!transport_->isConnected())
     transport_->connect(ioService_,
-                        ptr_lib::bind(&Node::onReceiveElement, this, _1),
-                        ptr_lib::bind(&Node::onTransportError, this));
+                        ptr_lib::bind(&Node::onReceiveElement, this, _1));
 
   transport_->send(data.wireEncode());
 }
@@ -212,21 +210,27 @@
   if (timeout > 0)
     {
       processEventsTimeoutTimer_.expires_from_now(boost::posix_time::milliseconds(timeout));
-      processEventsTimeoutTimer_.async_wait(func_lib::bind(&Node::shutdown, this));
+      processEventsTimeoutTimer_.async_wait(fireProcessEventsTimeout);
     }
   try
     {
       ioService_.run();
       ioService_.reset();
     }
-  catch(Node::Error &)
+  catch(Node::ProcessEventsTimeout &)
     {
-      ioService_.reset(); // this needed in order to call ioService_.run() again in the future
-      throw;
+      // break
     }
 }
 
 void
+Node::fireProcessEventsTimeout(const boost::system::error_code& error)
+{
+  if (!error) // can fire for some other reason, e.g., cancelled
+    throw Node::ProcessEventsTimeout();
+}
+
+void
 Node::checkPitExpire()
 {
   // Check for PIT entry timeouts.  Go backwards through the list so we can erase entries.
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 4bf85d7..81a46cc 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -29,32 +29,63 @@
     : transport_(transport)
     , socket_(*transport_.ioService_)
     , partialDataSize_(0)
+    , connectionInProgress_(false)
   {
   }
 
   void
+  connectHandler(const boost::system::error_code& error)
+  {
+    connectionInProgress_ = false;
+
+    if (!error)
+      {
+        partialDataSize_ = 0;
+        socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
+                              boost::bind(&Impl::handle_async_receive, this, _1, _2));
+
+        transport_.isConnected_ = true;
+
+        for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
+          socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
+                             boost::bind(&Impl::handle_async_send, this, _1, *i));
+
+        sendQueue_.clear();
+      }
+    else
+      {
+        // may need to throw exception
+        transport_.isConnected_ = false;
+        throw Transport::Error(error, "error while connecting to the forwarder");
+      }
+  }
+  
+  void
   connect()
   {
-    socket_.open();
-    socket_.connect(protocol::endpoint(transport_.unixSocket_));
-    // socket_.async_connect(protocol::endpoint(unixSocket));
-
-    partialDataSize_ = 0;
-    socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
-                          boost::bind(&Impl::handle_async_receive, this, _1, _2));
+    if (!connectionInProgress_) {
+      connectionInProgress_ = true;
+      socket_.open();
+      socket_.async_connect(protocol::endpoint(transport_.unixSocket_),
+                            func_lib::bind(&Impl::connectHandler, this, _1));
+    }
   }
 
   void 
   close()
   {
     socket_.close();
+    transport_.isConnected_ = false;
   }
 
   void 
   send(const Block &wire)
   {
-    socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
-                       boost::bind(&Impl::handle_async_send, this, _1, wire));
+    if (!transport_.isConnected_)
+      sendQueue_.push_back(wire);
+    else
+      socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
+                         boost::bind(&Impl::handle_async_send, this, _1, wire));
   }
 
   inline void
@@ -77,81 +108,75 @@
     if (error)
       {
         socket_.close(); // closing at this point may not be that necessary
-        transport_.errorCallback_();
+        transport_.isConnected_ = true;
+        throw Transport::Error(error, "error while receiving data from socket");
       }
     
     if (!error && bytes_recvd > 0)
       {
-        try
+        // inputBuffer_ has bytes_recvd received bytes of data
+        if (partialDataSize_ > 0)
           {
-            // inputBuffer_ has bytes_recvd received bytes of data
-            if (partialDataSize_ > 0)
-              {
-                size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
-                ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
-                partialDataSize_ += newDataSize;
+            size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
+            ndn_memcpy(partialData_ + partialDataSize_, inputBuffer_, newDataSize);
+            partialDataSize_ += newDataSize;
               
-                size_t offset = 0;
-                try
+            size_t offset = 0;
+            try
+              {
+                processAll(partialData_, offset, partialDataSize_);
+
+                // no exceptions => processed the whole thing
+                if (bytes_recvd - newDataSize > 0)
                   {
+                    // there is a little bit more data available
+                        
+                    offset = 0;
+                    partialDataSize_ = bytes_recvd - newDataSize;
+                    ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
+
                     processAll(partialData_, offset, partialDataSize_);
 
                     // no exceptions => processed the whole thing
-                    if (bytes_recvd - newDataSize > 0)
-                      {
-                        // there is a little bit more data available
-                        
-                        offset = 0;
-                        partialDataSize_ = bytes_recvd - newDataSize;
-                        ndn_memcpy(partialData_, inputBuffer_ + newDataSize, partialDataSize_);
-
-                        processAll(partialData_, offset, partialDataSize_);
-
-                        // no exceptions => processed the whole thing
-                        partialDataSize_ = 0;
-                      }
-                    else
-                      {
-                        // done processing
-                        partialDataSize_ = 0;
-                      }
+                    partialDataSize_ = 0;
                   }
-                catch(Block::Error &)
+                else
                   {
-                    if (offset > 0)
-                      {
-                        partialDataSize_ -= offset;
-                        ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
-                      }
-                    else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
-                      {
-                        // very bad... should close connection
-                        socket_.close();
-                        transport_.errorCallback_();
-                      }
+                    // done processing
+                    partialDataSize_ = 0;
                   }
               }
-            else
+            catch(Tlv::Error &)
               {
-                size_t offset = 0;
-                try
+                if (offset > 0)
                   {
-                    processAll(inputBuffer_, offset, bytes_recvd);
+                    partialDataSize_ -= offset;
+                    ndn_memcpy(partialData_, partialData_ + offset, partialDataSize_);
                   }
-                catch(Block::Error &error)
+                else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
                   {
-                    if (offset > 0)
-                      {
-                        partialDataSize_ = bytes_recvd - offset;
-                        ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
-                      }
+                    // very bad... should close connection
+                    socket_.close();
+                    transport_.isConnected_ = true;
+                    throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
                   }
               }
           }
-        catch(Tlv::Error &error)
+        else
           {
-            std::cerr << "[[handle_async_receive]] Tlv::Error: " << error.what() << std::endl;
-            // pass
+            size_t offset = 0;
+            try
+              {
+                processAll(inputBuffer_, offset, bytes_recvd);
+              }
+            catch(Tlv::Error &error)
+              {
+                if (offset > 0)
+                  {
+                    partialDataSize_ = bytes_recvd - offset;
+                    ndn_memcpy(partialData_, inputBuffer_ + offset, partialDataSize_);
+                  }
+              }
           }
       }
 
@@ -173,6 +198,9 @@
 
   uint8_t partialData_[MAX_LENGTH];
   size_t partialDataSize_;
+
+  std::list< Block > sendQueue_;
+  bool connectionInProgress_;
 };
 
 UnixTransport::UnixTransport(const std::string &unixSocket/* = "/tmp/.ndnd.sock"*/) 
@@ -186,15 +214,12 @@
 
 void 
 UnixTransport::connect(boost::asio::io_service &ioService,
-                       const ReceiveCallback &receiveCallback,
-                       const ErrorCallback &errorCallback)
+                       const ReceiveCallback &receiveCallback)
 {
-  Transport::connect(ioService, receiveCallback, errorCallback);
+  Transport::connect(ioService, receiveCallback);
   
   impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
   impl_->connect();
-  
-  isConnected_ = true;
 }
 
 void