face: Extending Face interface to support ``put`` call

Also, this commit includes Face/Node/Transport enhancement in regard to
the error processing: io_service thread will be stopped and Node::Error
exception will be thrown)

Change-Id: Ibec2f75e7b7d2e9a221a857fdc63295dd7da16e0
diff --git a/include/ndn-cpp/face.hpp b/include/ndn-cpp/face.hpp
index c7e0826..4071501 100644
--- a/include/ndn-cpp/face.hpp
+++ b/include/ndn-cpp/face.hpp
@@ -142,19 +142,36 @@
   {
     node_.removeRegisteredPrefix(registeredPrefixId);
   }
+
+  /**
+   * @brief Publish data packet
+   *
+   * This method can be called to satisfy the incoming Interest or to put Data packet into the cache
+   * of the local NDN forwarder
+   */
+  void
+  put(const Data &data)
+  {
+    node_.put(data);
+  }
   
   /**
    * Process any data to receive or call timeout callbacks.
-   * This is non-blocking and will return immediately if there is no data to receive.
-   * You should repeatedly call this from an event loop, with calls to sleep as needed so that the loop doesn't use 100% of the CPU.
+   *
+   * This call will block forever (default timeout == 0) to process IO on the face.
+   * To exit, one expected to call face.shutdown() from one of the callback methods.
+   *
+   * If timeout is specified, then processEvents will exit after this timeout, if not stopped earlier with face.shutdown().
+   * The call can be called repeatedly, if desired.
+   *
    * @throw This may throw an exception for reading data or in the callback for processing the data.  If you
    * call this from an main event loop, you may want to catch and log/disregard all exceptions.
    */
   void 
-  processEvents()
+  processEvents(Milliseconds timeout = 0)
   {
     // Just call Node's processEvents.
-    node_.processEvents();
+    node_.processEvents(timeout);
   }
 
   /**
diff --git a/include/ndn-cpp/node.hpp b/include/ndn-cpp/node.hpp
index b880c06..ef1d40e 100644
--- a/include/ndn-cpp/node.hpp
+++ b/include/ndn-cpp/node.hpp
@@ -42,6 +42,8 @@
     
 class Node {
 public:
+  struct Error : public std::runtime_error { Error(const std::string &what) : std::runtime_error(what) {} };
+
   /**
    * Create a new Node for communication with an NDN hub with the given Transport object and connectionInfo.
    * @param transport A shared_ptr to a Transport object used for communication.
@@ -95,15 +97,29 @@
   void
   removeRegisteredPrefix(uint64_t registeredPrefixId);
 
+   /**
+   * @brief Publish data packet
+   *
+   * This method can be called to satisfy the incoming Interest or to put Data packet into the cache
+   * of the local NDN forwarder
+   */
+  void
+  put(const Data &data);
+ 
   /**
-   * Process any data to receive.  For each element received, call onReceivedElement.
-   * This is non-blocking and will return immediately if there is no data to receive.
-   * You should repeatedly call this from an event loop, with calls to sleep as needed so that the loop doesn't use 100% of the CPU.
+   * Process any data to receive or call timeout callbacks.
+   *
+   * This call will block forever (default timeout == 0) to process IO on the face.
+   * To exit, one expected to call face.shutdown() from one of the callback methods.
+   *
+   * If timeout is specified, then processEvents will exit after this timeout, if not stopped earlier with face.shutdown().
+   * The call can be called repeatedly, if desired.
+   *
    * @throw This may throw an exception for reading data or in the callback for processing the data.  If you
    * call this from an main event loop, you may want to catch and log/disregard all exceptions.
    */
   void 
-  processEvents();
+  processEvents(Milliseconds timeout = 0);
   
   const ptr_lib::shared_ptr<Transport>& 
   getTransport() { return transport_; }
@@ -115,6 +131,9 @@
   void 
   onReceiveElement(const Block &wire);
 
+  void
+  onTransportError();
+
 private:
   class PendingInterest {
   public:
@@ -283,6 +302,7 @@
 private:
   boost::asio::io_service ioService_;
   boost::asio::deadline_timer timer_;
+  boost::asio::deadline_timer processEventsTimeoutTimer_;
   
   ptr_lib::shared_ptr<Transport> transport_;
 
diff --git a/include/ndn-cpp/transport/transport.hpp b/include/ndn-cpp/transport/transport.hpp
index 8bfa355..59af6d1 100644
--- a/include/ndn-cpp/transport/transport.hpp
+++ b/include/ndn-cpp/transport/transport.hpp
@@ -18,6 +18,7 @@
 class Transport {
 public:
   typedef ptr_lib::function<void (const Block &wire)> ReceiveCallback;
+  typedef ptr_lib::function<void ()> ErrorCallback;
   
   inline
   Transport();
@@ -26,11 +27,14 @@
   ~Transport();
 
   /**
-   * Connect according to the info in ConnectionInfo, and processEvents() will use elementListener.
-   * @param connectionInfo A reference to an object of a subclass of ConnectionInfo.
+   * Connect transport
+   *
+   * @throws If connection cannot be established
    */
   inline virtual void 
-  connect(boost::asio::io_service &io_service, const ReceiveCallback &receiveCallback);
+  connect(boost::asio::io_service &io_service,
+          const ReceiveCallback &receiveCallback,
+          const ErrorCallback &errorCallback);
   
   /**
    * Close the connection.
@@ -57,6 +61,7 @@
   boost::asio::io_service *ioService_;
   bool isConnected_;
   ReceiveCallback receiveCallback_;
+  ErrorCallback errorCallback_;
 };
 
 inline
@@ -72,10 +77,13 @@
 }
 
 inline void 
-Transport::connect(boost::asio::io_service &ioService, const ReceiveCallback &receiveCallback)
+Transport::connect(boost::asio::io_service &ioService,
+                   const ReceiveCallback &receiveCallback,
+                   const ErrorCallback &errorCallback)
 {
   ioService_ = &ioService;
   receiveCallback_ = receiveCallback;
+  errorCallback_ = errorCallback;
 }
 
 inline bool 
diff --git a/include/ndn-cpp/transport/unix-transport.hpp b/include/ndn-cpp/transport/unix-transport.hpp
index 1cc4758..9692cff 100644
--- a/include/ndn-cpp/transport/unix-transport.hpp
+++ b/include/ndn-cpp/transport/unix-transport.hpp
@@ -21,7 +21,9 @@
 
   // from Transport
   virtual void 
-  connect(boost::asio::io_service &ioService, const ReceiveCallback &receiveCallback);
+  connect(boost::asio::io_service &ioService,
+          const ReceiveCallback &receiveCallback,
+          const ErrorCallback &errorCallback);
   
   virtual void 
   close();
diff --git a/src/node.cpp b/src/node.cpp
index 3610344..cec382c 100644
--- a/src/node.cpp
+++ b/src/node.cpp
@@ -25,6 +25,7 @@
 
 Node::Node(const ptr_lib::shared_ptr<Transport>& transport)
   : timer_ (ioService_)
+  , processEventsTimeoutTimer_(ioService_)
   , transport_(transport)
   , ndndIdFetcherInterest_(Name("/%C1.M.S.localhost/%C1.M.SRV/ndnd/KEY"), 4000.0)
 {
@@ -37,7 +38,9 @@
 {
   // TODO: Properly check if we are already connected to the expected host.
   if (!transport_->isConnected())
-    transport_->connect(ioService_, ptr_lib::bind(&Node::onReceiveElement, this, _1));
+    transport_->connect(ioService_,
+                        ptr_lib::bind(&Node::onReceiveElement, this, _1),
+                        ptr_lib::bind(&Node::onTransportError, this));
   
   uint64_t pendingInterestId = PendingInterest::getNextPendingInterestId();
   pendingInterestTable_.push_back(ptr_lib::shared_ptr<PendingInterest>(new PendingInterest
@@ -49,6 +52,19 @@
 }
 
 void
+Node::put(const Data &data)
+{
+  // TODO: Properly check if we are already connected to the expected host.
+  if (!transport_->isConnected())
+    transport_->connect(ioService_,
+                        ptr_lib::bind(&Node::onReceiveElement, this, _1),
+                        ptr_lib::bind(&Node::onTransportError, this));
+
+  transport_->send(data.wireEncode());
+}
+
+
+void
 Node::removePendingInterest(uint64_t pendingInterestId)
 {
   // Go backwards through the list so we can erase entries.
@@ -191,12 +207,23 @@
 }
 
 void 
-Node::processEvents()
+Node::processEvents(Milliseconds timeout/* = 0 */)
 {
-  ioService_.run();
-
-  // auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(ioService_));
-  // work.reset(); // Allow run() to exit.   
+  if (timeout > 0)
+    {
+      processEventsTimeoutTimer_.expires_from_now(boost::posix_time::milliseconds(timeout));
+      processEventsTimeoutTimer_.async_wait(func_lib::bind(&Node::shutdown, this));
+    }
+  try
+    {
+      ioService_.run();
+      ioService_.reset();
+    }
+  catch(Node::Error &)
+    {
+      ioService_.reset(); // this needed in order to call ioService_.run() again in the future
+      throw;
+    }
 }
 
 void
@@ -250,6 +277,15 @@
     }
 }
 
+void
+Node::onTransportError()
+{
+  /// @todo Set some error code
+  
+  ioService_.stop();
+  throw Error("TransportError");
+}
+
 void 
 Node::shutdown()
 {
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 3e01943..4bf85d7 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -74,6 +74,12 @@
   {
     /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
 
+    if (error)
+      {
+        socket_.close(); // closing at this point may not be that necessary
+        transport_.errorCallback_();
+      }
+    
     if (!error && bytes_recvd > 0)
       {
         try
@@ -120,8 +126,8 @@
                     else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
                       {
                         // very bad... should close connection
-                        /// @todo Notify somebody 
                         socket_.close();
+                        transport_.errorCallback_();
                       }
                   }
               }
@@ -179,9 +185,11 @@
 }
 
 void 
-UnixTransport::connect(boost::asio::io_service &ioService, const ReceiveCallback &receiveCallback)
+UnixTransport::connect(boost::asio::io_service &ioService,
+                       const ReceiveCallback &receiveCallback,
+                       const ErrorCallback &errorCallback)
 {
-  Transport::connect(ioService, receiveCallback);
+  Transport::connect(ioService, receiveCallback, errorCallback);
   
   impl_ = std::auto_ptr<UnixTransport::Impl> (new UnixTransport::Impl(*this));
   impl_->connect();