ndn-handle: Implement TCP backdoor to inject data packets into repo
Change-Id: I74d0698f914a2e68d47ede427c7f36b3ba3b4f47
Refs: #1485
diff --git a/ndn-handle/ndn-handle-common.hpp b/ndn-handle/ndn-handle-common.hpp
index 73634b3..4f6ba67 100644
--- a/ndn-handle/ndn-handle-common.hpp
+++ b/ndn-handle/ndn-handle-common.hpp
@@ -38,7 +38,9 @@
using ndn::CommandInterestValidator;
using ndn::Scheduler;
-using boost::shared_ptr;
+using ndn::shared_ptr;
+using ndn::make_shared;
+using ndn::enable_shared_from_this;
typedef uint64_t ProcessId;
typedef uint64_t SegmentNo;
diff --git a/ndn-handle/tcp-bulk-insert-handle.cpp b/ndn-handle/tcp-bulk-insert-handle.cpp
new file mode 100644
index 0000000..5375169
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.cpp
@@ -0,0 +1,205 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "tcp-bulk-insert-handle.hpp"
+
+namespace repo {
+
+const size_t MAX_NDN_PACKET_SIZE = 8800;
+
+namespace detail {
+
+class TcpBulkInsertClient : noncopyable
+{
+public:
+ TcpBulkInsertClient(TcpBulkInsertHandle& writer,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ : m_writer(writer)
+ , m_socket(socket)
+ , m_hasStarted(false)
+ , m_inputBufferSize(0)
+ {
+ }
+
+ static void
+ startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+ {
+ BOOST_ASSERT(!client->m_hasStarted);
+
+ client->m_socket->async_receive(
+ boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+ bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+
+ client->m_hasStarted = true;
+ }
+
+private:
+ void
+ handleReceive(const boost::system::error_code& error,
+ std::size_t nBytesReceived,
+ const shared_ptr<TcpBulkInsertClient>& client);
+
+private:
+ TcpBulkInsertHandle& m_writer;
+ shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+ bool m_hasStarted;
+ uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+ std::size_t m_inputBufferSize;
+};
+
+} // namespace detail
+
+TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
+ StorageHandle& storageHandle)
+ : m_acceptor(ioService)
+ , m_storageHandle(storageHandle)
+{
+}
+
+void
+TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
+{
+ using namespace boost::asio;
+
+ ip::tcp::resolver resolver(m_acceptor.get_io_service());
+ ip::tcp::resolver::query query(host, port);
+
+ ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
+ ip::tcp::resolver::iterator end;
+
+ if (endpoint == end)
+ throw Error("Cannot listen on [" + host + ":" + port + "]");
+
+ m_localEndpoint = *endpoint;
+ std::cerr << "Start listening on " << m_localEndpoint << std::endl;
+
+ m_acceptor.open(m_localEndpoint .protocol());
+ m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
+ if (m_localEndpoint.address().is_v6())
+ {
+ m_acceptor.set_option(ip::v6_only(true));
+ }
+ m_acceptor.bind(m_localEndpoint);
+ m_acceptor.listen(255);
+
+ shared_ptr<ip::tcp::socket> clientSocket =
+ make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+ m_acceptor.async_accept(*clientSocket,
+ bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+ clientSocket));
+}
+
+void
+TcpBulkInsertHandle::stop()
+{
+ m_acceptor.cancel();
+ m_acceptor.close();
+}
+
+void
+TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+{
+ using namespace boost::asio;
+
+ if (error) {
+ // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ // return;
+ return;
+ }
+
+ std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+
+ shared_ptr<detail::TcpBulkInsertClient> client =
+ make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
+ detail::TcpBulkInsertClient::startReceive(client);
+
+ // prepare accepting the next connection
+ shared_ptr<ip::tcp::socket> clientSocket =
+ make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+ m_acceptor.async_accept(*clientSocket,
+ bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+ clientSocket));
+}
+
+void
+detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
+ std::size_t nBytesReceived,
+ const shared_ptr<detail::TcpBulkInsertClient>& client)
+{
+ if (error)
+ {
+ if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ return;
+
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
+
+ m_inputBufferSize += nBytesReceived;
+
+ // do magic
+
+ std::size_t offset = 0;
+
+ bool isOk = true;
+ Block element;
+ while (m_inputBufferSize - offset > 0)
+ {
+ isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
+ if (!isOk)
+ break;
+
+ offset += element.size();
+ BOOST_ASSERT(offset <= m_inputBufferSize);
+
+ if (element.type() == ndn::Tlv::Data)
+ {
+ try {
+ Data data(element);
+ bool isOk = m_writer.getStorageHandle().insertData(data);
+ if (isOk)
+ std::cerr << "Successfully injected " << data.getName() << std::endl;
+ else
+ std::cerr << "FAILED to inject " << data.getName() << std::endl;
+ }
+ catch (std::runtime_error& error) {
+ /// \todo Catch specific error after determining what wireDecode() can throw
+ std::cerr << "Error decoding received Data packet" << std::endl;
+ }
+ }
+ }
+ if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
+ {
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
+
+ if (offset > 0)
+ {
+ if (offset != m_inputBufferSize)
+ {
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+ m_inputBuffer);
+ m_inputBufferSize -= offset;
+ }
+ else
+ {
+ m_inputBufferSize = 0;
+ }
+ }
+
+ m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+ MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+ bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+}
+
+
+} // namespace repo
diff --git a/ndn-handle/tcp-bulk-insert-handle.hpp b/ndn-handle/tcp-bulk-insert-handle.hpp
new file mode 100644
index 0000000..1e3382f
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.hpp
@@ -0,0 +1,57 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+#define REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+
+#include "ndn-handle-common.hpp"
+#include <boost/asio.hpp>
+
+namespace repo {
+
+class TcpBulkInsertHandle : noncopyable
+{
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ TcpBulkInsertHandle(boost::asio::io_service& ioService,
+ StorageHandle& storageHandle);
+
+ void
+ listen(const std::string& host, const std::string& port);
+
+ void
+ stop();
+
+ StorageHandle&
+ getStorageHandle()
+ {
+ return m_storageHandle;
+ }
+
+private:
+ void
+ handleAccept(const boost::system::error_code& error,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket);
+
+private:
+ boost::asio::ip::tcp::acceptor m_acceptor;
+ boost::asio::ip::tcp::endpoint m_localEndpoint;
+ StorageHandle& m_storageHandle;
+};
+
+} // namespace repo
+
+#endif // REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP