face: UnixStreamTransport

Change-Id: Iaab3c09887d7167ab9024ba871162367bf835197
Refs: #3165
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
new file mode 100644
index 0000000..e9ec78d
--- /dev/null
+++ b/daemon/face/stream-transport.hpp
@@ -0,0 +1,271 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015,  Regents of the University of California,
+ *                           Arizona Board of Regents,
+ *                           Colorado State University,
+ *                           University Pierre & Marie Curie, Sorbonne University,
+ *                           Washington University in St. Louis,
+ *                           Beijing Institute of Technology,
+ *                           The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
+#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
+
+#include "transport.hpp"
+#include "core/global-io.hpp"
+
+#include <queue>
+
+namespace nfd {
+namespace face {
+
+/** \brief Implements Transport for stream-based protocols.
+ *
+ *  \tparam Protocol a stream-based protocol in Boost.Asio
+ */
+template<class Protocol>
+class StreamTransport : public Transport
+{
+public:
+  typedef Protocol protocol;
+
+  /** \brief Construct stream transport.
+   *
+   *  \param socket Protocol-specific socket for the created transport
+   */
+  explicit
+  StreamTransport(typename protocol::socket&& socket);
+
+  virtual void
+  doSend(Transport::Packet&& packet) DECL_OVERRIDE;
+
+  virtual void
+  doClose() DECL_OVERRIDE;
+
+protected:
+  void
+  deferredClose();
+
+  void
+  sendFromQueue();
+
+  void
+  handleSend(const boost::system::error_code& error,
+             size_t nBytesSent);
+
+  void
+  handleReceive(const boost::system::error_code& error,
+                size_t nBytesReceived);
+
+  void
+  processErrorCode(const boost::system::error_code& error);
+
+protected:
+  typename protocol::socket m_socket;
+
+  NFD_LOG_INCLASS_DECLARE();
+
+private:
+  uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
+  size_t m_inputBufferSize;
+  std::queue<Block> m_sendQueue;
+};
+
+// All derived classes must use
+// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, <specialization-parameter>, "Name");
+
+
+template<class T>
+inline
+StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
+  : m_socket(std::move(socket))
+  , m_inputBufferSize(0)
+{
+  m_socket.async_receive(boost::asio::buffer(m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE),
+                         bind(&StreamTransport<T>::handleReceive, this,
+                              boost::asio::placeholders::error,
+                              boost::asio::placeholders::bytes_transferred));
+}
+
+template<class T>
+inline void
+StreamTransport<T>::doSend(Transport::Packet&& packet)
+{
+  NFD_LOG_FACE_TRACE(__func__);
+
+  bool wasQueueEmpty = m_sendQueue.empty();
+  m_sendQueue.push(packet.packet);
+
+  if (wasQueueEmpty)
+    sendFromQueue();
+}
+
+template<class T>
+inline void
+StreamTransport<T>::doClose()
+{
+  NFD_LOG_FACE_TRACE(__func__);
+
+  if (m_socket.is_open()) {
+    // Cancel all outstanding operations and shutdown the socket
+    // so that no further sends or receives are possible.
+    // Use the non-throwing variants and ignore errors, if any.
+    boost::system::error_code error;
+    m_socket.cancel(error);
+    m_socket.shutdown(protocol::socket::shutdown_both, error);
+  }
+
+  // Ensure that the Transport stays alive at least until
+  // all pending handlers are dispatched
+  getGlobalIoService().post(bind(&StreamTransport<T>::deferredClose, this));
+
+  // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
+  //
+  // When doClose is called from a socket event handler (e.g., from handleReceive),
+  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
+  // Instead, handleSend is invoked as nothing bad happened.
+  //
+  // In order to prevent the assertion in handleSend from failing, we clear the queue
+  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
+  // this point have been executed.  If more send operations are scheduled after this
+  // point, they will fail because the socket has been shutdown, and their callbacks
+  // will be invoked with error code == asio::error::shut_down.
+}
+
+template<class T>
+inline void
+StreamTransport<T>::deferredClose()
+{
+  NFD_LOG_FACE_TRACE(__func__);
+
+  // clear send queue
+  std::queue<Block> emptyQueue;
+  std::swap(emptyQueue, m_sendQueue);
+
+  // use the non-throwing variant and ignore errors, if any
+  boost::system::error_code error;
+  m_socket.close(error);
+
+  this->setState(TransportState::CLOSED);
+}
+
+template<class T>
+inline void
+StreamTransport<T>::sendFromQueue()
+{
+  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
+                           bind(&StreamTransport<T>::handleSend, this,
+                                boost::asio::placeholders::error,
+                                boost::asio::placeholders::bytes_transferred));
+}
+
+template<class T>
+inline void
+StreamTransport<T>::handleSend(const boost::system::error_code& error,
+                               size_t nBytesSent)
+{
+  if (error)
+    return processErrorCode(error);
+
+  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
+
+  BOOST_ASSERT(!m_sendQueue.empty());
+  m_sendQueue.pop();
+
+  if (!m_sendQueue.empty())
+    sendFromQueue();
+}
+
+template<class T>
+inline void
+StreamTransport<T>::handleReceive(const boost::system::error_code& error,
+                                  size_t nBytesReceived)
+{
+  if (error)
+    return processErrorCode(error);
+
+  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
+
+  m_inputBufferSize += nBytesReceived;
+
+  size_t offset = 0;
+
+  bool isOk = true;
+  Block element;
+  while (m_inputBufferSize - offset > 0) {
+    std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
+    if (!isOk)
+      break;
+
+    offset += element.size();
+
+    BOOST_ASSERT(offset <= m_inputBufferSize);
+
+    Transport::Packet packet(std::move(element));
+    this->receive(std::move(packet));
+  }
+
+  if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
+    NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
+    this->setState(TransportState::FAILED);
+    doClose();
+    return;
+  }
+
+  if (offset > 0) {
+    if (offset != m_inputBufferSize) {
+      std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
+      m_inputBufferSize -= offset;
+    }
+    else {
+      m_inputBufferSize = 0;
+    }
+  }
+
+  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                             ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize),
+                         bind(&StreamTransport<T>::handleReceive, this,
+                              boost::asio::placeholders::error,
+                              boost::asio::placeholders::bytes_transferred));
+}
+
+template<class T>
+inline void
+StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
+{
+  NFD_LOG_FACE_TRACE(__func__);
+
+  if (getState() == TransportState::CLOSING ||
+      getState() == TransportState::FAILED ||
+      getState() == TransportState::CLOSED ||
+      error == boost::asio::error::operation_aborted || // when cancel() is called
+      error == boost::asio::error::shut_down)           // after shutdown() is called
+    // transport is shutting down, ignore any errors
+    return;
+
+  if (error != boost::asio::error::eof)
+    NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
+
+  this->setState(TransportState::FAILED);
+  doClose();
+}
+
+} // namespace face
+} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
diff --git a/daemon/face/unix-stream-channel.cpp b/daemon/face/unix-stream-channel.cpp
index c883163..3de9850 100644
--- a/daemon/face/unix-stream-channel.cpp
+++ b/daemon/face/unix-stream-channel.cpp
@@ -24,7 +24,9 @@
  */
 
 #include "unix-stream-channel.hpp"
-#include "unix-stream-face.hpp"
+#include "generic-link-service.hpp"
+#include "lp-face-wrapper.hpp"
+#include "unix-stream-transport.hpp"
 #include "core/global-io.hpp"
 
 #include <boost/filesystem.hpp>
@@ -131,9 +133,10 @@
 
   NFD_LOG_DEBUG("[" << m_endpoint << "] Incoming connection");
 
-  auto remoteUri = FaceUri::fromFd(m_socket.native_handle());
-  auto localUri = FaceUri(m_socket.local_endpoint());
-  auto face = make_shared<UnixStreamFace>(remoteUri, localUri, std::move(m_socket));
+  auto linkService = make_unique<face::GenericLinkService>();
+  auto transport = make_unique<face::UnixStreamTransport>(std::move(m_socket));
+  auto lpFace = make_unique<face::LpFace>(std::move(linkService), std::move(transport));
+  auto face = make_shared<face::LpFaceWrapper>(std::move(lpFace));
   onFaceCreated(face);
 
   // prepare accepting the next connection
diff --git a/daemon/face/unix-stream-face.cpp b/daemon/face/unix-stream-face.cpp
deleted file mode 100644
index 41ef68b..0000000
--- a/daemon/face/unix-stream-face.cpp
+++ /dev/null
@@ -1,46 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE.  See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
- **/
-
-#include "unix-stream-face.hpp"
-
-namespace nfd {
-
-// The whole purpose of this file is to specialize the logger,
-// otherwise, everything could be put into the header file.
-
-NFD_LOG_INCLASS_2TEMPLATE_SPECIALIZATION_DEFINE(StreamFace,
-                                                UnixStreamFace::protocol, LocalFace,
-                                                "UnixStreamFace");
-
-UnixStreamFace::UnixStreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
-                               protocol::socket socket)
-  : StreamFace<protocol, LocalFace>(remoteUri, localUri, std::move(socket), true)
-{
-  static_assert(
-    std::is_same<std::remove_cv<protocol::socket::native_handle_type>::type, int>::value,
-    "The native handle type for UnixStreamFace sockets must be 'int'"
-  );
-}
-
-} // namespace nfd
diff --git a/daemon/face/unix-stream-face.hpp b/daemon/face/unix-stream-face.hpp
deleted file mode 100644
index c377819..0000000
--- a/daemon/face/unix-stream-face.hpp
+++ /dev/null
@@ -1,49 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE.  See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
- **/
-
-#ifndef NFD_DAEMON_FACE_UNIX_STREAM_FACE_HPP
-#define NFD_DAEMON_FACE_UNIX_STREAM_FACE_HPP
-
-#include "stream-face.hpp"
-
-#ifndef HAVE_UNIX_SOCKETS
-#error "Cannot include this file when UNIX sockets are not available"
-#endif
-
-namespace nfd {
-
-/**
- * \brief Implementation of Face abstraction that uses stream-oriented
- *        Unix domain sockets as underlying transport mechanism
- */
-class UnixStreamFace : public StreamFace<boost::asio::local::stream_protocol, LocalFace>
-{
-public:
-  UnixStreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
-                 protocol::socket socket);
-};
-
-} // namespace nfd
-
-#endif // NFD_DAEMON_FACE_UNIX_STREAM_FACE_HPP
diff --git a/daemon/face/unix-stream-transport.cpp b/daemon/face/unix-stream-transport.cpp
new file mode 100644
index 0000000..17351f0
--- /dev/null
+++ b/daemon/face/unix-stream-transport.cpp
@@ -0,0 +1,51 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015,  Regents of the University of California,
+ *                           Arizona Board of Regents,
+ *                           Colorado State University,
+ *                           University Pierre & Marie Curie, Sorbonne University,
+ *                           Washington University in St. Louis,
+ *                           Beijing Institute of Technology,
+ *                           The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "unix-stream-transport.hpp"
+
+namespace nfd {
+namespace face {
+
+NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, UnixStreamTransport::protocol,
+                                               "UnixStreamTransport");
+
+UnixStreamTransport::UnixStreamTransport(protocol::socket&& socket)
+  : StreamTransport(std::move(socket))
+{
+  static_assert(
+    std::is_same<std::remove_cv<protocol::socket::native_handle_type>::type, int>::value,
+    "The native handle type for UnixStreamFace sockets must be 'int'"
+  );
+
+  this->setLocalUri(FaceUri(m_socket.local_endpoint()));
+  this->setRemoteUri(FaceUri::fromFd(m_socket.native_handle()));
+  this->setScope(ndn::nfd::FACE_SCOPE_LOCAL);
+  this->setPersistency(ndn::nfd::FACE_PERSISTENCY_ON_DEMAND);
+
+  NFD_LOG_FACE_INFO("Creating Transport");
+}
+
+} // namespace face
+} // namespace nfd
diff --git a/daemon/face/unix-stream-transport.hpp b/daemon/face/unix-stream-transport.hpp
new file mode 100644
index 0000000..27f4eaa
--- /dev/null
+++ b/daemon/face/unix-stream-transport.hpp
@@ -0,0 +1,51 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015,  Regents of the University of California,
+ *                           Arizona Board of Regents,
+ *                           Colorado State University,
+ *                           University Pierre & Marie Curie, Sorbonne University,
+ *                           Washington University in St. Louis,
+ *                           Beijing Institute of Technology,
+ *                           The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_DAEMON_FACE_UNIX_STREAM_TRANSPORT_HPP
+#define NFD_DAEMON_FACE_UNIX_STREAM_TRANSPORT_HPP
+
+#include "stream-transport.hpp"
+
+#ifndef HAVE_UNIX_SOCKETS
+#error "Cannot include this file when UNIX sockets are not available"
+#endif
+
+namespace nfd {
+namespace face {
+
+/**
+ * \brief A Transport that communicates on a stream-oriented Unix domain socket
+ */
+class UnixStreamTransport : public StreamTransport<boost::asio::local::stream_protocol>
+{
+public:
+  explicit
+  UnixStreamTransport(protocol::socket&& socket);
+};
+
+} // namespace face
+} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_UNIX_STREAM_TRANSPORT_HPP