face: Implementation of encode/decode of LocalControlHeader

LocalControlHeader can only be used on faces that are derived from
LocalFace.  UnixStreamFace is directly inherited from LocalFace,
TCP face has two specializations: generic TcpFace (strictly not local),
and LocalTcpFace.

refs #1213

Change-Id: I8a158c3bc4bb929eedd15757cfddecc0d1049f9f
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index ac08421..9baef1d 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -8,14 +8,18 @@
 #define NFD_FACE_STREAM_FACE_HPP
 
 #include "face.hpp"
+#include "local-face.hpp"
 
 namespace nfd {
 
-template <class T>
-class StreamFace : public Face
+// forward declaration
+template<class T, class U, class V> struct StreamFaceSenderImpl;
+
+template<class Protocol, class FaceBase = Face>
+class StreamFace : public FaceBase
 {
 public:
-  typedef T protocol;
+  typedef Protocol protocol;
 
   /**
    * \brief Create instance of StreamFace
@@ -37,6 +41,12 @@
 
 protected:
   void
+  processErrorCode(const boost::system::error_code& error);
+
+  void
+  handleSend(const boost::system::error_code& error,
+             const Block& header, const Block& payload);
+  void
   handleSend(const boost::system::error_code& error,
              const Block& wire);
 
@@ -56,159 +66,201 @@
 private:
   uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
   std::size_t m_inputBufferSize;
-  
+
+  friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
+  friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
+
   NFD_LOG_INCLASS_DECLARE();
 };
 
 // All inherited classes must use
 // NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
 
-template <class T>
+
+/** \brief Class allowing validation of the StreamFace use
+ *
+ *  For example, partial specialization based on boost::asio::ip::tcp should check
+ *  that local endpoint is loopback
+ *
+ *  @throws Face::Error if validation failed
+ */
+template<class Protocol, class U>
+struct StreamFaceValidator
+{
+  static void
+  validateSocket(typename Protocol::socket& socket)
+  {
+  }
+};
+
+
+template<class T, class U>
 inline
-StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
+StreamFace<T, U>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
   : m_socket(socket)
   , m_inputBufferSize(0)
 {
+  StreamFaceValidator<T, U>::validateSocket(*socket);
   m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
-                          bind(&StreamFace<T>::handleReceive, this, _1, _2));
+                          bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
 }
 
-template <class T>
+template<class T, class U>
 inline
-StreamFace<T>::~StreamFace()
+StreamFace<T, U>::~StreamFace()
 {
 }
 
-
-template <class T>
-inline void
-StreamFace<T>::sendInterest(const Interest& interest)
+template<class Protocol, class FaceBase, class Packet>
+struct StreamFaceSenderImpl
 {
-  m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
-                                           interest.wireEncode().size()),
-                       bind(&StreamFace<T>::handleSend, this, _1, interest.wireEncode()));
-  
-  // anything else should be done here?
+  static void
+  send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
+  {
+    face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
+                                                  packet.wireEncode().size()),
+                              bind(&StreamFace<Protocol, FaceBase>::handleSend,
+                                   &face, _1, packet.wireEncode()));
+  }
+};
+
+// partial specialization (only classes can be partially specialized)
+template<class Protocol, class Packet>
+struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
+{
+  static void
+  send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
+  {
+    using namespace boost::asio;
+
+    if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
+      {
+        const Block& payload = packet.wireEncode();
+        face.m_socket->async_send(buffer(payload.wire(), payload.size()),
+                                  bind(&StreamFace<Protocol, LocalFace>::handleSend,
+                                       &face, _1, packet.wireEncode()));
+      }
+    else
+      {
+        Block header = face.filterAndEncodeLocalControlHeader(packet);
+        const Block& payload = packet.wireEncode();
+
+        std::vector<const_buffer> buffers;
+        buffers.reserve(2);
+        buffers.push_back(buffer(header.wire(),  header.size()));
+        buffers.push_back(buffer(payload.wire(), payload.size()));
+
+        face.m_socket->async_send(buffers,
+                                  bind(&StreamFace<Protocol, LocalFace>::handleSend,
+                                       &face, _1, header, payload));
+      }
+  }
+};
+
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::sendInterest(const Interest& interest)
+{
+  StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::sendData(const Data& data)
+StreamFace<T, U>::sendData(const Data& data)
 {
-  m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
-                                           data.wireEncode().size()),
-                       bind(&StreamFace<T>::handleSend, this, _1, data.wireEncode()));
-  
-  // anything else should be done here?
+  StreamFaceSenderImpl<T, U, Data>::send(*this, data);
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::close()
+StreamFace<T, U>::close()
 {
   if (!m_socket->is_open())
     return;
-  
+
   NFD_LOG_INFO("[id:" << this->getId()
                << ",endpoint:" << m_socket->local_endpoint()
                << "] Close connection");
 
   closeSocket();
-  onFail("Close connection");
+  this->onFail("Close connection");
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::handleSend(const boost::system::error_code& error,
-                          const Block& wire)
+StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
 {
-  if (error) {
-    if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
-      return;
-
-    if (!m_socket->is_open())
-      {
-        onFail("Connection closed");
-        return;
-      }
-
-    if (error == boost::asio::error::eof)
-      {
-        NFD_LOG_INFO("[id:" << this->getId()
-                     << ",endpoint:" << m_socket->local_endpoint()
-                     << "] Connection closed");
-      }
-    else
-      {
-        NFD_LOG_WARN("[id:" << this->getId()
-                     << ",endpoint:" << m_socket->local_endpoint()
-                     << "] Send operation failed, closing socket: "
-                     << error.category().message(error.value()));
-      }
-
-    closeSocket();
-
-    if (error == boost::asio::error::eof)
-      {
-        onFail("Connection closed");
-      }
-    else
-      {
-        onFail("Send operation failed, closing socket: " +
-               error.category().message(error.value()));
-      }
+  if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
     return;
-  }
+
+  if (!m_socket->is_open())
+    {
+      this->onFail("Connection closed");
+      return;
+    }
+
+  if (error == boost::asio::error::eof)
+    {
+      NFD_LOG_INFO("[id:" << this->getId()
+                   << ",endpoint:" << m_socket->local_endpoint()
+                   << "] Connection closed");
+    }
+  else
+    {
+      NFD_LOG_WARN("[id:" << this->getId()
+                   << ",endpoint:" << m_socket->local_endpoint()
+                   << "] Send or receive operation failed, closing socket: "
+                   << error.category().message(error.value()));
+    }
+
+  closeSocket();
+
+  if (error == boost::asio::error::eof)
+    {
+      this->onFail("Connection closed");
+    }
+  else
+    {
+      this->onFail("Send or receive operation failed, closing socket: " +
+                   error.category().message(error.value()));
+    }
+}
+
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::handleSend(const boost::system::error_code& error,
+                             const Block& wire)
+{
+  if (error)
+    return processErrorCode(error);
 
   NFD_LOG_TRACE("[id:" << this->getId()
                 << ",endpoint:" << m_socket->local_endpoint()
                 << "] Successfully sent: " << wire.size() << " bytes");
-  // do nothing (needed to retain validity of wire memory block
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::handleReceive(const boost::system::error_code& error,
+StreamFace<T, U>::handleSend(const boost::system::error_code& error,
+                             const Block& header, const Block& payload)
+{
+  if (error)
+    return processErrorCode(error);
+
+  NFD_LOG_TRACE("[id:" << this->getId()
+                << ",endpoint:" << m_socket->local_endpoint()
+                << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
+}
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
                              std::size_t bytes_recvd)
 {
-  if (error || bytes_recvd == 0) {
-    if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
-      return;
-
-    // this should be unnecessary, but just in case
-    if (!m_socket->is_open())
-      {
-        onFail("Connection closed");
-        return;
-      }
-
-    if (error == boost::asio::error::eof)
-      {
-        NFD_LOG_INFO("[id:" << this->getId()
-                     << ",endpoint:" << m_socket->local_endpoint()
-                     << "] Connection closed");
-      }
-    else
-      {
-        NFD_LOG_WARN("[id:" << this->getId()
-                     << ",endpoint:" << m_socket->local_endpoint()
-                     << "] Receive operation failed: "
-                     << error.category().message(error.value()));
-      }
-
-    closeSocket();
-
-    if (error == boost::asio::error::eof)
-      {
-        onFail("Connection closed");
-      }
-    else
-      {
-        onFail("Receive operation failed, closing socket: " +
-               error.category().message(error.value()));
-      }
-    return;
-  }
+  if (error)
+    return processErrorCode(error);
 
   NFD_LOG_TRACE("[id:" << this->getId()
                 << ",endpoint:" << m_socket->local_endpoint()
@@ -226,27 +278,8 @@
         offset += element.size();
 
         BOOST_ASSERT(offset <= m_inputBufferSize);
-        
-        /// @todo Ensure lazy field decoding process
-        if (element.type() == tlv::Interest)
-          {
-            shared_ptr<Interest> i = make_shared<Interest>();
-            i->wireDecode(element);
-            onReceiveInterest(*i);
-          }
-        else if (element.type() == tlv::Data)
-          {
-            shared_ptr<Data> d = make_shared<Data>();
-            d->wireDecode(element);
-            onReceiveData(*d);
-          }
-        // @todo Add local header support
-        // else if (element.type() == tlv::LocalHeader)
-        //   {
-        //     shared_ptr<Interest> i = make_shared<Interest>();
-        //     i->wireDecode(element);
-        //   }
-        else
+
+        if (!this->decodeAndDispatchInput(element))
           {
             NFD_LOG_WARN("[id:" << this->getId()
                          << ",endpoint:" << m_socket->local_endpoint()
@@ -265,7 +298,7 @@
                      << "closing down the face");
 
         closeSocket();
-        onFail("Received input is invalid or too large to process, closing down the face");
+        this->onFail("Received input is invalid or too large to process, closing down the face");
         return;
       }
   }
@@ -286,18 +319,18 @@
 
   m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
                                               MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
-                          bind(&StreamFace<T>::handleReceive, this, _1, _2));
+                          bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
+StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
 {
 }
 
-template <class T>
+template<class T, class U>
 inline void
-StreamFace<T>::closeSocket()
+StreamFace<T, U>::closeSocket()
 {
   boost::asio::io_service& io = m_socket->get_io_service();
 
@@ -309,7 +342,7 @@
 
   // ensure that the Face object is alive at least until all pending
   // handlers are dispatched
-  io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
+  io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
                this, this->shared_from_this()));
 }