ndn-handle: Implement TCP backdoor to inject data packets into repo

Change-Id: I74d0698f914a2e68d47ede427c7f36b3ba3b4f47
Refs: #1485
diff --git a/ndn-handle/ndn-handle-common.hpp b/ndn-handle/ndn-handle-common.hpp
index 73634b3..4f6ba67 100644
--- a/ndn-handle/ndn-handle-common.hpp
+++ b/ndn-handle/ndn-handle-common.hpp
@@ -38,7 +38,9 @@
 using ndn::CommandInterestValidator;
 using ndn::Scheduler;
 
-using boost::shared_ptr;
+using ndn::shared_ptr;
+using ndn::make_shared;
+using ndn::enable_shared_from_this;
 
 typedef uint64_t ProcessId;
 typedef uint64_t SegmentNo;
diff --git a/ndn-handle/tcp-bulk-insert-handle.cpp b/ndn-handle/tcp-bulk-insert-handle.cpp
new file mode 100644
index 0000000..5375169
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.cpp
@@ -0,0 +1,205 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "tcp-bulk-insert-handle.hpp"
+
+namespace repo {
+
+const size_t MAX_NDN_PACKET_SIZE = 8800;
+
+namespace detail {
+
+class TcpBulkInsertClient : noncopyable
+{
+public:
+  TcpBulkInsertClient(TcpBulkInsertHandle& writer,
+                      const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+    : m_writer(writer)
+    , m_socket(socket)
+    , m_hasStarted(false)
+    , m_inputBufferSize(0)
+  {
+  }
+
+  static void
+  startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+  {
+    BOOST_ASSERT(!client->m_hasStarted);
+
+    client->m_socket->async_receive(
+      boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+      bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+
+    client->m_hasStarted = true;
+  }
+
+private:
+  void
+  handleReceive(const boost::system::error_code& error,
+                std::size_t nBytesReceived,
+                const shared_ptr<TcpBulkInsertClient>& client);
+
+private:
+  TcpBulkInsertHandle& m_writer;
+  shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+  bool m_hasStarted;
+  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+  std::size_t m_inputBufferSize;
+};
+
+} // namespace detail
+
+TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
+                                         StorageHandle& storageHandle)
+  : m_acceptor(ioService)
+  , m_storageHandle(storageHandle)
+{
+}
+
+void
+TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
+{
+  using namespace boost::asio;
+
+  ip::tcp::resolver resolver(m_acceptor.get_io_service());
+  ip::tcp::resolver::query query(host, port);
+
+  ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
+  ip::tcp::resolver::iterator end;
+
+  if (endpoint == end)
+    throw Error("Cannot listen on [" + host + ":" + port + "]");
+
+  m_localEndpoint = *endpoint;
+  std::cerr << "Start listening on " << m_localEndpoint  << std::endl;
+
+  m_acceptor.open(m_localEndpoint .protocol());
+  m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
+  if (m_localEndpoint.address().is_v6())
+    {
+      m_acceptor.set_option(ip::v6_only(true));
+    }
+  m_acceptor.bind(m_localEndpoint);
+  m_acceptor.listen(255);
+
+  shared_ptr<ip::tcp::socket> clientSocket =
+    make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+  m_acceptor.async_accept(*clientSocket,
+                          bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+                               clientSocket));
+}
+
+void
+TcpBulkInsertHandle::stop()
+{
+  m_acceptor.cancel();
+  m_acceptor.close();
+}
+
+void
+TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
+                                  const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+{
+  using namespace boost::asio;
+
+  if (error) {
+    // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+    //   return;
+    return;
+  }
+
+  std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+
+  shared_ptr<detail::TcpBulkInsertClient> client =
+    make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
+  detail::TcpBulkInsertClient::startReceive(client);
+
+  // prepare accepting the next connection
+  shared_ptr<ip::tcp::socket> clientSocket =
+    make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+  m_acceptor.async_accept(*clientSocket,
+                          bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+                               clientSocket));
+}
+
+void
+detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
+                                           std::size_t nBytesReceived,
+                                           const shared_ptr<detail::TcpBulkInsertClient>& client)
+{
+  if (error)
+    {
+      if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+        return;
+
+      boost::system::error_code error;
+      m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+      m_socket->close(error);
+      return;
+    }
+
+  m_inputBufferSize += nBytesReceived;
+
+  // do magic
+
+  std::size_t offset = 0;
+
+  bool isOk = true;
+  Block element;
+  while (m_inputBufferSize - offset > 0)
+    {
+      isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
+      if (!isOk)
+        break;
+
+      offset += element.size();
+      BOOST_ASSERT(offset <= m_inputBufferSize);
+
+      if (element.type() == ndn::Tlv::Data)
+        {
+          try {
+            Data data(element);
+            bool isOk = m_writer.getStorageHandle().insertData(data);
+            if (isOk)
+              std::cerr << "Successfully injected " << data.getName() << std::endl;
+            else
+              std::cerr << "FAILED to inject " << data.getName() << std::endl;
+          }
+          catch (std::runtime_error& error) {
+            /// \todo Catch specific error after determining what wireDecode() can throw
+            std::cerr << "Error decoding received Data packet" << std::endl;
+          }
+        }
+    }
+  if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
+    {
+      boost::system::error_code error;
+      m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+      m_socket->close(error);
+      return;
+    }
+
+  if (offset > 0)
+    {
+      if (offset != m_inputBufferSize)
+        {
+          std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+                    m_inputBuffer);
+          m_inputBufferSize -= offset;
+        }
+      else
+        {
+          m_inputBufferSize = 0;
+        }
+    }
+
+  m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                              MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+                          bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+}
+
+
+} // namespace repo
diff --git a/ndn-handle/tcp-bulk-insert-handle.hpp b/ndn-handle/tcp-bulk-insert-handle.hpp
new file mode 100644
index 0000000..1e3382f
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.hpp
@@ -0,0 +1,57 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+#define REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+
+#include "ndn-handle-common.hpp"
+#include <boost/asio.hpp>
+
+namespace repo {
+
+class TcpBulkInsertHandle : noncopyable
+{
+public:
+  class Error : public std::runtime_error
+  {
+  public:
+    explicit
+    Error(const std::string& what)
+      : std::runtime_error(what)
+    {
+    }
+  };
+
+public:
+  TcpBulkInsertHandle(boost::asio::io_service& ioService,
+                      StorageHandle& storageHandle);
+
+  void
+  listen(const std::string& host, const std::string& port);
+
+  void
+  stop();
+
+  StorageHandle&
+  getStorageHandle()
+  {
+    return m_storageHandle;
+  }
+
+private:
+  void
+  handleAccept(const boost::system::error_code& error,
+               const shared_ptr<boost::asio::ip::tcp::socket>& socket);
+
+private:
+  boost::asio::ip::tcp::acceptor m_acceptor;
+  boost::asio::ip::tcp::endpoint m_localEndpoint;
+  StorageHandle& m_storageHandle;
+};
+
+} // namespace repo
+
+#endif // REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP