face/tcp: Implementing mock version of TcpFace

The real version should avoid all exceptions in the processing path,
enable lazy decoding of Interest/Data packets, and implement
LocalControlHeader processing and association with Interest/Data
packets.

Change-Id: Ied907b3d10b7110a1dcc05d0d6d3c0dd277df8e1
refs: #1132, #1133, #1134, #1135
diff --git a/daemon/common.hpp b/daemon/common.hpp
index 9b50fda..a98719d 100644
--- a/daemon/common.hpp
+++ b/daemon/common.hpp
@@ -14,6 +14,7 @@
 
 #include <boost/utility.hpp>
 #include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 #include <boost/ref.hpp>
@@ -28,6 +29,7 @@
 
 using boost::noncopyable;
 using boost::shared_ptr;
+using boost::enable_shared_from_this;
 using boost::make_shared;
 using boost::function;
 using boost::bind;
diff --git a/daemon/face/face.hpp b/daemon/face/face.hpp
index 8022e3d..f014545 100644
--- a/daemon/face/face.hpp
+++ b/daemon/face/face.hpp
@@ -17,10 +17,12 @@
  */
 typedef int FaceId;
 
+const std::size_t MAX_NDN_PACKET_SIZE = 8800;
+
 /** \class Face
  *  \brief represents a face
  */
-class Face : noncopyable
+class Face : noncopyable, public enable_shared_from_this<Face>
 {
 public:
   Face(FaceId id);
@@ -33,6 +35,9 @@
   
   /// fires when a Data is received
   EventEmitter<const Data&> onReceiveData;
+
+  /// fires when face disconnects or fails to perform properly
+  EventEmitter<const std::string& /*reason*/> onFail;
   
   /// send an Interest
   virtual void
diff --git a/daemon/face/stream-face.cpp b/daemon/face/stream-face.cpp
deleted file mode 100644
index e84634d..0000000
--- a/daemon/face/stream-face.cpp
+++ /dev/null
@@ -1,8 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (C) 2014 Named Data Networking Project
- * See COPYING for copyright and distribution information.
- */
-
-#include "stream-face.hpp"
-
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index 778eb71..8fd39a1 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -17,13 +17,133 @@
 public:
   typedef T protocol;
 
-  StreamFace(FaceId id)
-    : Face(id)
-  {
-  }
-  
+  StreamFace(FaceId id,
+             const shared_ptr<typename protocol::socket>& socket);
+
+protected:
+  void
+  handleSend(const boost::system::error_code& error,
+             const Block& wire);
+
+  void
+  handleReceive(const boost::system::error_code& error,
+                std::size_t bytes_recvd);
+
+protected:
+  shared_ptr<typename protocol::socket> m_socket;
+
+private:
+  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+  std::size_t m_inputBufferSize;
 };
 
+template <class T>
+inline
+StreamFace<T>::StreamFace(FaceId id,
+                          const shared_ptr<typename StreamFace::protocol::socket>& socket)
+  : Face(id)
+  , m_socket(socket)
+{
+  m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+                          bind(&StreamFace<T>::handleReceive, this, _1, _2));
+}
+
+
+template <class T>
+inline void
+StreamFace<T>::handleSend(const boost::system::error_code& error,
+                          const Block& wire)
+{
+  if (error) {
+    if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+      return;
+
+    onFail("Send operation failed: " + error.category().message(error.value()));
+    m_socket->close();
+    return;
+  }
+
+  // do nothing (needed to retain validity of wire memory block
+}
+
+template <class T>
+inline void
+StreamFace<T>::handleReceive(const boost::system::error_code& error,
+                             std::size_t bytes_recvd)
+{
+  if (error || bytes_recvd == 0) {
+    if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+      return;
+
+    onFail("Receive operation failed: " + error.category().message(error.value()));
+    m_socket->close();
+    return;
+  }
+
+  m_inputBufferSize += bytes_recvd;
+  // do magic
+
+  std::size_t offset = 0;
+  /// @todo Eliminate reliance on exceptions in this path
+  try {
+    Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
+    offset += element.size();
+
+    /// @todo Ensure lazy field decoding process
+    if (element.type() == Tlv::Interest)
+      {
+        shared_ptr<Interest> i = make_shared<Interest>();
+        i->wireDecode(element);
+        onReceiveInterest(*i);
+      }
+    else if (element.type() == Tlv::Data)
+      {
+        shared_ptr<Data> d = make_shared<Data>();
+        d->wireDecode(element);
+        onReceiveData(*d);
+      }
+    // @todo Add local header support
+    // else if (element.type() == Tlv::LocalHeader)
+    //   {
+    //     shared_ptr<Interest> i = make_shared<Interest>();
+    //     i->wireDecode(element);
+    //   }
+    else
+      {
+        /// @todo Add loggin
+
+        // ignore unknown packet and proceed
+      }
+  }
+  catch(const Tlv::Error&) {
+    if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
+      {
+        onFail("Received input is invalid or too large to process, closing down the face");
+        m_socket->close();
+        return;
+      }
+  }
+
+  if (offset > 0)
+    {
+      if (offset != m_inputBufferSize)
+        {
+          std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+                    m_inputBuffer);
+          m_inputBufferSize -= offset;
+        }
+      else
+        {
+          m_inputBufferSize = 0;
+        }
+    }
+
+  m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                              MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+                          bind(&StreamFace<T>::handleReceive, this, _1, _2));
+}
+
+
 } // namespace ndn
 
 #endif // NFD_FACE_STREAM_FACE_HPP
diff --git a/daemon/face/tcp-face.cpp b/daemon/face/tcp-face.cpp
index 42d4fc7..eb3ac94 100644
--- a/daemon/face/tcp-face.cpp
+++ b/daemon/face/tcp-face.cpp
@@ -8,20 +8,30 @@
 
 namespace ndn {
 
-TcpFace::TcpFace(FaceId id, const shared_ptr<TcpFace::protocol::socket>& socket)
-  : StreamFace<protocol>(id)
-  , m_socket(socket)
+TcpFace::TcpFace(FaceId id,
+                 const shared_ptr<TcpFace::protocol::socket>& socket)
+  : StreamFace<protocol>(id, socket)
 {
 }
 
 void
 TcpFace::sendInterest(const Interest& interest)
 {
+  m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
+                                           interest.wireEncode().size()),
+                       bind(&TcpFace::handleSend, this, _1, interest.wireEncode()));
+
+  // anything else should be done here?
 }
 
 void
 TcpFace::sendData(const Data& data)
 {
+  m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
+                                           data.wireEncode().size()),
+                       bind(&TcpFace::handleSend, this, _1, data.wireEncode()));
+
+  // anything else should be done here?
 }
 
 } // namespace ndn
diff --git a/daemon/face/tcp-face.hpp b/daemon/face/tcp-face.hpp
index 86ec086..3d2bb53 100644
--- a/daemon/face/tcp-face.hpp
+++ b/daemon/face/tcp-face.hpp
@@ -19,7 +19,10 @@
 class TcpFace : public StreamFace<boost::asio::ip::tcp>
 {
 public:
-  TcpFace(FaceId id, const shared_ptr<protocol::socket>& socket);
+  typedef boost::asio::ip::tcp protocol;
+
+  TcpFace(FaceId id,
+          const shared_ptr<protocol::socket>& socket);
 
   // from Face
   virtual void
@@ -27,9 +30,6 @@
 
   virtual void
   sendData(const Data& data);
-  
-private:
-  shared_ptr<protocol::socket> m_socket;
 };
 
 } // namespace ndn