ndn-handle: Implement TCP backdoor to inject data packets into repo

Change-Id: I74d0698f914a2e68d47ede427c7f36b3ba3b4f47
Refs: #1485
diff --git a/ndn-handle/ndn-handle-common.hpp b/ndn-handle/ndn-handle-common.hpp
index 73634b3..4f6ba67 100644
--- a/ndn-handle/ndn-handle-common.hpp
+++ b/ndn-handle/ndn-handle-common.hpp
@@ -38,7 +38,9 @@
 using ndn::CommandInterestValidator;
 using ndn::Scheduler;
 
-using boost::shared_ptr;
+using ndn::shared_ptr;
+using ndn::make_shared;
+using ndn::enable_shared_from_this;
 
 typedef uint64_t ProcessId;
 typedef uint64_t SegmentNo;
diff --git a/ndn-handle/tcp-bulk-insert-handle.cpp b/ndn-handle/tcp-bulk-insert-handle.cpp
new file mode 100644
index 0000000..5375169
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.cpp
@@ -0,0 +1,205 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "tcp-bulk-insert-handle.hpp"
+
+namespace repo {
+
+const size_t MAX_NDN_PACKET_SIZE = 8800;
+
+namespace detail {
+
+class TcpBulkInsertClient : noncopyable
+{
+public:
+  TcpBulkInsertClient(TcpBulkInsertHandle& writer,
+                      const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+    : m_writer(writer)
+    , m_socket(socket)
+    , m_hasStarted(false)
+    , m_inputBufferSize(0)
+  {
+  }
+
+  static void
+  startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+  {
+    BOOST_ASSERT(!client->m_hasStarted);
+
+    client->m_socket->async_receive(
+      boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+      bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+
+    client->m_hasStarted = true;
+  }
+
+private:
+  void
+  handleReceive(const boost::system::error_code& error,
+                std::size_t nBytesReceived,
+                const shared_ptr<TcpBulkInsertClient>& client);
+
+private:
+  TcpBulkInsertHandle& m_writer;
+  shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+  bool m_hasStarted;
+  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+  std::size_t m_inputBufferSize;
+};
+
+} // namespace detail
+
+TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
+                                         StorageHandle& storageHandle)
+  : m_acceptor(ioService)
+  , m_storageHandle(storageHandle)
+{
+}
+
+void
+TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
+{
+  using namespace boost::asio;
+
+  ip::tcp::resolver resolver(m_acceptor.get_io_service());
+  ip::tcp::resolver::query query(host, port);
+
+  ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
+  ip::tcp::resolver::iterator end;
+
+  if (endpoint == end)
+    throw Error("Cannot listen on [" + host + ":" + port + "]");
+
+  m_localEndpoint = *endpoint;
+  std::cerr << "Start listening on " << m_localEndpoint  << std::endl;
+
+  m_acceptor.open(m_localEndpoint .protocol());
+  m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
+  if (m_localEndpoint.address().is_v6())
+    {
+      m_acceptor.set_option(ip::v6_only(true));
+    }
+  m_acceptor.bind(m_localEndpoint);
+  m_acceptor.listen(255);
+
+  shared_ptr<ip::tcp::socket> clientSocket =
+    make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+  m_acceptor.async_accept(*clientSocket,
+                          bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+                               clientSocket));
+}
+
+void
+TcpBulkInsertHandle::stop()
+{
+  m_acceptor.cancel();
+  m_acceptor.close();
+}
+
+void
+TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
+                                  const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+{
+  using namespace boost::asio;
+
+  if (error) {
+    // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+    //   return;
+    return;
+  }
+
+  std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+
+  shared_ptr<detail::TcpBulkInsertClient> client =
+    make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
+  detail::TcpBulkInsertClient::startReceive(client);
+
+  // prepare accepting the next connection
+  shared_ptr<ip::tcp::socket> clientSocket =
+    make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+  m_acceptor.async_accept(*clientSocket,
+                          bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+                               clientSocket));
+}
+
+void
+detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
+                                           std::size_t nBytesReceived,
+                                           const shared_ptr<detail::TcpBulkInsertClient>& client)
+{
+  if (error)
+    {
+      if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+        return;
+
+      boost::system::error_code error;
+      m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+      m_socket->close(error);
+      return;
+    }
+
+  m_inputBufferSize += nBytesReceived;
+
+  // do magic
+
+  std::size_t offset = 0;
+
+  bool isOk = true;
+  Block element;
+  while (m_inputBufferSize - offset > 0)
+    {
+      isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
+      if (!isOk)
+        break;
+
+      offset += element.size();
+      BOOST_ASSERT(offset <= m_inputBufferSize);
+
+      if (element.type() == ndn::Tlv::Data)
+        {
+          try {
+            Data data(element);
+            bool isOk = m_writer.getStorageHandle().insertData(data);
+            if (isOk)
+              std::cerr << "Successfully injected " << data.getName() << std::endl;
+            else
+              std::cerr << "FAILED to inject " << data.getName() << std::endl;
+          }
+          catch (std::runtime_error& error) {
+            /// \todo Catch specific error after determining what wireDecode() can throw
+            std::cerr << "Error decoding received Data packet" << std::endl;
+          }
+        }
+    }
+  if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
+    {
+      boost::system::error_code error;
+      m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+      m_socket->close(error);
+      return;
+    }
+
+  if (offset > 0)
+    {
+      if (offset != m_inputBufferSize)
+        {
+          std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+                    m_inputBuffer);
+          m_inputBufferSize -= offset;
+        }
+      else
+        {
+          m_inputBufferSize = 0;
+        }
+    }
+
+  m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                              MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+                          bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+}
+
+
+} // namespace repo
diff --git a/ndn-handle/tcp-bulk-insert-handle.hpp b/ndn-handle/tcp-bulk-insert-handle.hpp
new file mode 100644
index 0000000..1e3382f
--- /dev/null
+++ b/ndn-handle/tcp-bulk-insert-handle.hpp
@@ -0,0 +1,57 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+#define REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
+
+#include "ndn-handle-common.hpp"
+#include <boost/asio.hpp>
+
+namespace repo {
+
+class TcpBulkInsertHandle : noncopyable
+{
+public:
+  class Error : public std::runtime_error
+  {
+  public:
+    explicit
+    Error(const std::string& what)
+      : std::runtime_error(what)
+    {
+    }
+  };
+
+public:
+  TcpBulkInsertHandle(boost::asio::io_service& ioService,
+                      StorageHandle& storageHandle);
+
+  void
+  listen(const std::string& host, const std::string& port);
+
+  void
+  stop();
+
+  StorageHandle&
+  getStorageHandle()
+  {
+    return m_storageHandle;
+  }
+
+private:
+  void
+  handleAccept(const boost::system::error_code& error,
+               const shared_ptr<boost::asio::ip::tcp::socket>& socket);
+
+private:
+  boost::asio::ip::tcp::acceptor m_acceptor;
+  boost::asio::ip::tcp::endpoint m_localEndpoint;
+  StorageHandle& m_storageHandle;
+};
+
+} // namespace repo
+
+#endif // REPO_NDN_HANDLE_WRITE_TCP_BACKDOOR_HPP
diff --git a/server/server.cpp b/server/server.cpp
index 4ea011a..b577be0 100644
--- a/server/server.cpp
+++ b/server/server.cpp
@@ -13,6 +13,7 @@
 #include "../storage/sqlite/sqlite-handle.hpp"
 #include "../ndn-handle/read-handle.hpp"
 #include "../ndn-handle/write-handle.hpp"
+#include "../ndn-handle/tcp-bulk-insert-handle.hpp"
 #include "../ndn-handle/delete-handle.hpp"
 
 using namespace repo;
@@ -49,7 +50,7 @@
     confPath = "./repo.conf";
   }
 
-  Name dataPrefix("ndn:/example/data");
+  Name dataPrefix("ndn:/");
   Name repoPrefix("ndn:/example/repo");
   /// @todo read from configuration
 
@@ -67,10 +68,16 @@
 
   ReadHandle readHandle(face, sqliteHandle, keyChain, scheduler);
   readHandle.listen(dataPrefix);
+
   WriteHandle writeHandle(face, sqliteHandle, keyChain, scheduler, validator);
   writeHandle.listen(repoPrefix);
+
   DeleteHandle deleteHandle(face, sqliteHandle, keyChain, scheduler, validator);
   deleteHandle.listen(repoPrefix);
+
+  TcpBulkInsertHandle tcpBulkInsertHandle(*io, sqliteHandle);
+  tcpBulkInsertHandle.listen("localhost", "7376");
+
   face.processEvents();
   return 0;
 }
diff --git a/storage/sqlite/sqlite-handle.cpp b/storage/sqlite/sqlite-handle.cpp
index 732e9ab..2c872e4 100644
--- a/storage/sqlite/sqlite-handle.cpp
+++ b/storage/sqlite/sqlite-handle.cpp
@@ -33,7 +33,9 @@
                       "data BLOB, "
                       "parentName BLOB, "
                       "nChildren INTEGER);\n"
-                      "CREATE INDEX NdnRepoparentName ON NDN_REPO (parentName)", 0, 0, &errMsg);
+                      "CREATE INDEX NdnRepoParentName ON NDN_REPO (parentName);\n"
+                      "CREATE INDEX NdnRepoData ON NDN_REPO (data);\n"
+                 , 0, 0, &errMsg);
     // Ignore errors (when database already exists, errors are expected)
   }
   else {
@@ -504,7 +506,7 @@
           if (nChildren > 0) {
             internalNames.push(elementName);
           }
-          if (sqlite3_column_type(queryParentStmt, 3) != SQLITE_NULL) {
+          if (sqlite3_column_type(queryParentStmt, 1) != SQLITE_NULL) {
             names.push_back(elementName);
           }
         }
@@ -779,4 +781,29 @@
   return true;
 }
 
+size_t
+SqliteHandle::size()
+{
+  sqlite3_stmt* queryStmt = 0;
+  string sql("SELECT count(*) FROM NDN_REPO WHERE data IS NOT NULL");
+  int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+  if (rc != SQLITE_OK)
+    {
+      std::cerr << "Database query failure rc:" << rc << std::endl;
+      sqlite3_finalize(queryStmt);
+      throw Error("Database query failure");
+    }
+
+  rc = sqlite3_step(queryStmt);
+  if (rc != SQLITE_ROW)
+    {
+      std::cerr << "Database query failure rc:" << rc << std::endl;
+      sqlite3_finalize(queryStmt);
+      throw Error("Database query failure");
+    }
+
+  size_t nDatas = static_cast<size_t>(sqlite3_column_int64(queryStmt, 0));
+  return nDatas;
+}
+
 } //namespace repo
diff --git a/storage/sqlite/sqlite-handle.hpp b/storage/sqlite/sqlite-handle.hpp
index a2091c8..a595242 100644
--- a/storage/sqlite/sqlite-handle.hpp
+++ b/storage/sqlite/sqlite-handle.hpp
@@ -60,6 +60,9 @@
   virtual bool
   readNameAny(const Name& name, const Selectors& selectors, vector<Name>& names);
 
+  virtual size_t
+  size();
+
 private:
   void
   initializeRepo();
diff --git a/storage/storage-handle.hpp b/storage/storage-handle.hpp
index 48359d4..767db6f 100644
--- a/storage/storage-handle.hpp
+++ b/storage/storage-handle.hpp
@@ -32,8 +32,8 @@
 using std::string;
 using boost::noncopyable;
 
-/**   
- * @brief this class defines handles to read, insert and delete data packets in storage media
+/**
+ * @brief this class defines handles to read, insert and delete data packets in storage media
  */
 
 class StorageHandle : noncopyable
@@ -101,6 +101,12 @@
   virtual bool
   readNameAny(const Name& name, const Selectors& selectors, vector<Name>& names) = 0;
 
+  /**
+   * @brief Get the number of Data packets stored
+   */
+  virtual size_t
+  size() = 0;
+
 private:
   StorageMethod m_storageMethod;
 };
diff --git a/tests/dataset-fixtures.hpp b/tests/dataset-fixtures.hpp
new file mode 100644
index 0000000..8dfeb78
--- /dev/null
+++ b/tests/dataset-fixtures.hpp
@@ -0,0 +1,210 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_TESTS_DATASET_FIXTURES_HPP
+#define REPO_TESTS_DATASET_FIXTURES_HPP
+
+#include <ndn-cpp-dev/security/key-chain.hpp>
+
+namespace repo {
+namespace tests {
+
+static inline ndn::shared_ptr<ndn::Data>
+createData(const ndn::Name& name)
+{
+  static ndn::KeyChain keyChain;
+  static std::vector<uint8_t> content(1500, '-');
+
+  ndn::shared_ptr<ndn::Data> data = ndn::make_shared<ndn::Data>();
+  data->setName(name);
+  data->setContent(&content[0], content.size());
+  keyChain.sign(*data);
+
+  return data;
+}
+
+
+class DatasetBase
+{
+public:
+  typedef std::list<ndn::shared_ptr<ndn::Data> > DataContainer;
+  DataContainer data;
+
+  typedef std::list<std::pair<ndn::Interest, ndn::shared_ptr<ndn::Data> > > InterestContainer;
+  InterestContainer interests;
+};
+
+
+template<size_t N>
+class BaseSmoketestFixture : public DatasetBase
+{
+public:
+  static const std::string&
+  getName()
+  {
+    static std::string name = "BaseSmoketest";
+    return name;
+  }
+
+  BaseSmoketestFixture()
+  {
+    ndn::Name baseName("/x/y/z/test/1");
+    for (size_t i = 0; i < N; i++) {
+      ndn::Name name(baseName);
+      name.appendSegment(i);
+      ndn::shared_ptr<Data> data = createData(name);
+      this->data.push_back(data);
+
+      this->interests.push_back(std::make_pair(Interest(name), data));
+    }
+  }
+};
+
+
+class BaseTestFixture : public DatasetBase
+{
+public:
+  static const std::string&
+  getName()
+  {
+    static std::string name = "BaseTest";
+    return name;
+  }
+
+  BaseTestFixture()
+  {
+    this->data.push_back(createData("/a"));
+    this->interests.push_back(std::make_pair(Interest("/a"), this->data.back()));
+
+    this->data.push_back(createData("/a/b"));
+    this->interests.push_back(std::make_pair(Interest("/a/b"), this->data.back()));
+
+    this->data.push_back(createData("/a/b/c"));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c"), this->data.back()));
+
+    this->data.push_back(createData("/a/b/c/d"));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d"), this->data.back()));
+  }
+};
+
+
+class FetchByPrefixTestFixture : public DatasetBase
+{
+public:
+  static const std::string&
+  getName()
+  {
+    static std::string name = "FetchByPrefix";
+    return name;
+  }
+
+  FetchByPrefixTestFixture()
+  {
+    this->data.push_back(createData("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z"));
+    this->interests.push_back(std::make_pair(Interest("/a"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t"),
+                                             this->data.back()));
+    this->interests.push_back(std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u"),
+                                             this->data.back()));
+    this->interests.push_back(
+       std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v"),
+                      this->data.back()));
+    this->interests.push_back(
+       std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w"),
+                      this->data.back()));
+    this->interests.push_back(
+      std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x"),
+                     this->data.back()));
+    this->interests.push_back(
+      std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y"),
+                     this->data.back()));
+    this->interests.push_back(
+      std::make_pair(Interest("/a/b/c/d/e/f/g/h/i/j/k/l/m/n/o/p/q/r/s/t/u/v/w/x/y/z"),
+                     this->data.back()));
+  }
+};
+
+
+class SelectorTestFixture : public DatasetBase
+{
+public:
+  static const std::string&
+  getName()
+  {
+    static std::string name = "SelectorTest";
+    return name;
+  }
+
+  SelectorTestFixture()
+  {
+    this->data.push_back(createData("/a/1"));
+    this->data.push_back(createData("/b/1"));
+    this->interests.push_back(std::make_pair(Interest()
+                                         .setName("/b")
+                                         .setSelectors(Selectors()
+                                                         .setChildSelector(0)),
+                                       this->data.back()));
+
+    this->data.push_back(createData("/c/1"));
+    this->data.push_back(createData("/b/99"));
+    this->interests.push_back(std::make_pair(Interest()
+                                         .setName("/b")
+                                         .setSelectors(Selectors()
+                                                         .setChildSelector(1)),
+                                       this->data.back()));
+    this->data.push_back(createData("/b/5"));
+    this->data.push_back(createData("/b/55"));
+  }
+};
+
+
+typedef boost::mpl::vector< BaseTestFixture,
+                            FetchByPrefixTestFixture,
+                            SelectorTestFixture,
+                            BaseSmoketestFixture<1>,
+                            BaseSmoketestFixture<100> > DatasetFixtures;
+
+} // namespace tests
+} // namespace repo
+
+#endif // REPO_TESTS_DATASET_FIXTURES_HPP
diff --git a/tests/test-repo-command-parameter.cpp b/tests/repo-command-parameter.cpp
similarity index 87%
rename from tests/test-repo-command-parameter.cpp
rename to tests/repo-command-parameter.cpp
index 76ca548..9b70636 100644
--- a/tests/test-repo-command-parameter.cpp
+++ b/tests/repo-command-parameter.cpp
@@ -10,12 +10,13 @@
 #include <boost/test/unit_test.hpp>
 
 namespace repo {
+namespace tests {
 
-BOOST_AUTO_TEST_SUITE(TestRepoCommandParameter)
+BOOST_AUTO_TEST_SUITE(RepoCommandParameter)
 
-BOOST_AUTO_TEST_CASE(TestRepoCommandParameter)
+BOOST_AUTO_TEST_CASE(EncodeDecode)
 {
-  RepoCommandParameter parameter;
+  repo::RepoCommandParameter parameter;
   parameter.setName("/a/b/c");
   parameter.setStartBlockId(1);
   parameter.setEndBlockId(100);
@@ -40,9 +41,9 @@
   BOOST_REQUIRE_EQUAL_COLLECTIONS(expected, expected + sizeof(expected),
                                   wire.begin(), wire.end());
 
-  BOOST_REQUIRE_NO_THROW(RepoCommandParameter(wire));
+  BOOST_REQUIRE_NO_THROW(repo::RepoCommandParameter(wire));
 
-  RepoCommandParameter decoded(wire);
+  repo::RepoCommandParameter decoded(wire);
   //std::cout << decoded << std::endl;
   BOOST_CHECK_EQUAL(decoded.getName(), parameter.getName());
   BOOST_CHECK_EQUAL(decoded.getStartBlockId(), parameter.getStartBlockId());
@@ -54,4 +55,5 @@
 
 BOOST_AUTO_TEST_SUITE_END()
 
+} // namespace tests
 } // namespace repo
diff --git a/tests/test-repo-command-response.cpp b/tests/repo-command-response.cpp
similarity index 86%
rename from tests/test-repo-command-response.cpp
rename to tests/repo-command-response.cpp
index c97fce8..f9d5226 100644
--- a/tests/test-repo-command-response.cpp
+++ b/tests/repo-command-response.cpp
@@ -9,12 +9,13 @@
 #include <boost/test/unit_test.hpp>
 
 namespace repo {
+namespace tests {
 
-BOOST_AUTO_TEST_SUITE(TestRepoCommandResponse)
+BOOST_AUTO_TEST_SUITE(RepoCommandResponse)
 
-BOOST_AUTO_TEST_CASE(TestRepoCommandResponse)
+BOOST_AUTO_TEST_CASE(EncodeDecode)
 {
-  RepoCommandResponse response;
+  repo::RepoCommandResponse response;
   response.setStatusCode(404);
   response.setStartBlockId(1);
   response.setEndBlockId(100);
@@ -38,9 +39,9 @@
   BOOST_REQUIRE_EQUAL_COLLECTIONS(expected, expected + sizeof(expected),
                                   wire.begin(), wire.end());
 
-  BOOST_REQUIRE_NO_THROW(RepoCommandResponse(wire));
+  BOOST_REQUIRE_NO_THROW(repo::RepoCommandResponse(wire));
 
-  RepoCommandResponse decoded(wire);
+  repo::RepoCommandResponse decoded(wire);
   BOOST_CHECK_EQUAL(decoded.getStatusCode(), response.getStatusCode());
   BOOST_CHECK_EQUAL(decoded.getStartBlockId(), response.getStartBlockId());
   BOOST_CHECK_EQUAL(decoded.getEndBlockId(), response.getEndBlockId());
@@ -51,4 +52,5 @@
 
 BOOST_AUTO_TEST_SUITE_END()
 
+} // namespace tests
 } // namespace repo
diff --git a/tests/sqlite-fixture.hpp b/tests/sqlite-fixture.hpp
new file mode 100644
index 0000000..b370f20
--- /dev/null
+++ b/tests/sqlite-fixture.hpp
@@ -0,0 +1,46 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_TESTS_SQLITE_FIXTURE_HPP
+#define REPO_TESTS_SQLITE_FIXTURE_HPP
+
+#include "../storage/sqlite/sqlite-handle.hpp"
+
+#include <boost/filesystem.hpp>
+#include <boost/test/unit_test.hpp>
+
+namespace repo {
+namespace tests {
+
+class SqliteFixture
+{
+public:
+  SqliteFixture()
+  {
+    boost::filesystem::path testPath("unittestdb");
+    boost::filesystem::file_status testPathStatus = boost::filesystem::status(testPath);
+    if (!boost::filesystem::is_directory(testPathStatus)) {
+      if (!boost::filesystem::create_directory(boost::filesystem::path(testPath))) {
+        BOOST_FAIL("Cannot create unittestdb folder");
+      }
+    }
+    handle = new SqliteHandle("unittestdb");
+  }
+
+  ~SqliteFixture()
+  {
+    delete handle;
+    boost::filesystem::remove_all(boost::filesystem::path("unittestdb"));
+  }
+
+public:
+  SqliteHandle* handle;
+};
+
+} // namespace tests
+} // namespace repo
+
+#endif // REPO_TESTS_SQLITE_FIXTURE_HPP
diff --git a/tests/sqlite-handle.cpp b/tests/sqlite-handle.cpp
new file mode 100644
index 0000000..5f3ef22
--- /dev/null
+++ b/tests/sqlite-handle.cpp
@@ -0,0 +1,61 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "../storage/sqlite/sqlite-handle.hpp"
+
+#include "sqlite-fixture.hpp"
+#include "dataset-fixtures.hpp"
+
+#include <boost/test/unit_test.hpp>
+
+namespace repo {
+namespace tests {
+
+BOOST_AUTO_TEST_SUITE(SqliteHandle)
+
+template<class Dataset>
+class Fixture : public SqliteFixture, public Dataset
+{
+};
+
+BOOST_FIXTURE_TEST_CASE_TEMPLATE(InsertReadDelete, T, DatasetFixtures, Fixture<T>)
+{
+  BOOST_TEST_MESSAGE(T::getName());
+
+  // Insert
+  for (typename T::DataContainer::iterator i = this->data.begin();
+       i != this->data.end(); ++i) {
+    BOOST_CHECK_EQUAL(this->handle->insertData(**i), true);
+  }
+
+  BOOST_CHECK_EQUAL(this->handle->size(), this->data.size());
+
+  // Read (all items should exist)
+  for (typename T::InterestContainer::iterator i = this->interests.begin();
+       i != this->interests.end(); ++i) {
+    ndn::Data retrievedData;
+    BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), true);
+    BOOST_CHECK_EQUAL(retrievedData, *i->second);
+  }
+
+  // Delete
+  for (typename T::DataContainer::iterator i = this->data.begin();
+       i != this->data.end(); ++i) {
+    BOOST_CHECK_EQUAL(this->handle->deleteData((*i)->getName()), true);
+  }
+
+  // Read (none of the items should exist)
+  for (typename T::InterestContainer::iterator i = this->interests.begin();
+       i != this->interests.end(); ++i) {
+    ndn::Data retrievedData;
+    BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), false);
+  }
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace tests
+} // namespace repo
diff --git a/tests/tcp-bulk-insert-handle.cpp b/tests/tcp-bulk-insert-handle.cpp
new file mode 100644
index 0000000..ffe002b
--- /dev/null
+++ b/tests/tcp-bulk-insert-handle.cpp
@@ -0,0 +1,172 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "../ndn-handle/tcp-bulk-insert-handle.hpp"
+
+#include "sqlite-fixture.hpp"
+#include "dataset-fixtures.hpp"
+
+#include <boost/test/unit_test.hpp>
+
+namespace repo {
+namespace tests {
+
+BOOST_AUTO_TEST_SUITE(TcpBulkInsertHandle)
+
+class TcpClient
+{
+public:
+  TcpClient()
+    : socket(ioService)
+  {
+  }
+
+  void
+  start(const std::string& host, const std::string& port)
+  {
+    using namespace boost::asio;
+
+    ip::tcp::resolver resolver(ioService);
+    ip::tcp::resolver::query query(host, port);
+
+    ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
+    ip::tcp::resolver::iterator end;
+
+    if (endpoint == end)
+      BOOST_FAIL("Cannot resolve [" + host + ":" + port + "]");
+
+    ip::tcp::endpoint serverEndpoint = *endpoint;
+
+    socket.async_connect(serverEndpoint,
+                         bind(&TcpClient::onSuccessfullConnect, this, _1));
+  }
+
+  virtual void
+  onSuccessfullConnect(const boost::system::error_code& error)
+  {
+    if (error)
+      {
+        BOOST_FAIL("TCP connection aborted");
+        return;
+      }
+  }
+
+public:
+  boost::asio::io_service ioService;
+  boost::asio::ip::tcp::socket socket;
+};
+
+template<class Dataset>
+class TcpBulkInsertFixture : public TcpClient,
+                             public SqliteFixture,
+                             public Dataset
+{
+public:
+  TcpBulkInsertFixture()
+    : scheduler(ioService)
+    , bulkInserter(ioService, *handle)
+  {
+    guardEvent = scheduler.scheduleEvent(ndn::time::seconds(2),
+                                         bind(&TcpBulkInsertFixture::fail, this, "Test timed out"));
+  }
+
+  virtual void
+  onSuccessfullConnect(const boost::system::error_code& error)
+  {
+    TcpClient::onSuccessfullConnect(error);
+
+    // This value may need to be adjusted if some dataset exceeds 100k
+    socket.set_option(boost::asio::socket_base::send_buffer_size(100000));
+
+    // Initially I wrote the following to use scatter-gather approach (using
+    // std::vector<const_buffer> and a single socket.async_send operation). Unfortunately, as
+    // described in http://www.boost.org/doc/libs/1_48_0/doc/html/boost_asio/overview/implementation.html,
+    // scatter-gather is limited to at most `min(64,IOV_MAX)` buffers to be transmitted
+    // in a single operation
+    for (typename Dataset::DataContainer::iterator i = this->data.begin();
+         i != this->data.end(); ++i) {
+
+      socket.async_send(boost::asio::buffer((*i)->wireEncode().wire(),  (*i)->wireEncode().size()),
+                        bind(&TcpBulkInsertFixture::onSendFinished, this, _1, false));
+    }
+
+    socket.async_send(boost::asio::buffer(static_cast<const uint8_t*>(0), 0),
+                      bind(&TcpBulkInsertFixture::onSendFinished, this, _1, true));
+  }
+
+  void
+  onSendFinished(const boost::system::error_code& error, bool isFinal)
+  {
+    if (error) {
+      BOOST_FAIL("TCP connection aborted");
+      return;
+    }
+
+    if (isFinal) {
+      scheduler.cancelEvent(guardEvent);
+
+      // In case there are some outstanding handlers
+      // ioService.post(bind(&TcpBulkInsertFixture::stop, this));
+      scheduler.scheduleEvent(ndn::time::seconds(1),
+                              bind(&TcpBulkInsertFixture::stop, this));
+    }
+  }
+
+  void
+  fail(const std::string& info)
+  {
+    ioService.stop();
+    BOOST_FAIL(info);
+  }
+
+  void
+  stop()
+  {
+    // Terminate test
+    socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
+    socket.close();
+
+    bulkInserter.stop();
+    // may be ioService.stop() as well
+  }
+
+public:
+  Scheduler scheduler;
+  ndn::EventId guardEvent;
+  repo::TcpBulkInsertHandle bulkInserter;
+};
+
+
+BOOST_FIXTURE_TEST_CASE_TEMPLATE(BulkInsertAndRead, T, DatasetFixtures, TcpBulkInsertFixture<T>)
+{
+  BOOST_TEST_MESSAGE(T::getName());
+  // BOOST_CHECK_EQUAL(this->handle->size(), 1);
+
+  // start bulk inserter
+  this->bulkInserter.listen("localhost", "17376");
+
+  // start test
+  this->start("localhost", "17376");
+
+  // actually run the test
+  this->ioService.run();
+
+  BOOST_CHECK_EQUAL(this->handle->size(), this->data.size());
+
+  // Read (all items should exist)
+  for (typename T::InterestContainer::iterator i = this->interests.begin();
+       i != this->interests.end(); ++i) {
+    ndn::Data retrievedData;
+    BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), true);
+    BOOST_CHECK_EQUAL(retrievedData, *i->second);
+  }
+}
+
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace repo
+} // namespace tests
diff --git a/tests/test-sqlite.cpp b/tests/test-sqlite.cpp
deleted file mode 100644
index 1d9793c..0000000
--- a/tests/test-sqlite.cpp
+++ /dev/null
@@ -1,167 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (C) 2014 Regents of the University of California.
- * See COPYING for copyright and distribution information.
- */
-
-#include "../storage/sqlite/sqlite-handle.hpp"
-
-#include <ndn-cpp-dev/security/key-chain.hpp>
-
-#include <boost/filesystem.hpp>
-#include <boost/test/unit_test.hpp>
-#include <boost/test/output_test_stream.hpp>
-
-namespace repo {
-
-class TestDbCreateDestroy
-{
-public:
-  TestDbCreateDestroy()
-  {
-    boost::filesystem::path testPath("unittestdb");
-    boost::filesystem::file_status testPathStatus = boost::filesystem::status(testPath);
-    if (!boost::filesystem::is_directory(testPathStatus)) {
-      if (!boost::filesystem::create_directory(boost::filesystem::path(testPath))) {
-        BOOST_FAIL("Cannot create unittestdb folder");
-      }
-    }
-    handle = new SqliteHandle("unittestdb");
-  }
-
-  ~TestDbCreateDestroy()
-  {
-    delete handle;
-    boost::filesystem::remove_all(boost::filesystem::path("unittestdb"));
-  }
-
-public:
-  SqliteHandle* handle;
-};
-
-BOOST_FIXTURE_TEST_SUITE(TestSqlite, TestDbCreateDestroy)
-
-static inline ndn::shared_ptr<ndn::Data>
-createData(const ndn::Name& name)
-{
-  static ndn::KeyChain keyChain;
-  static std::vector<uint8_t> content(1500, '-');
-
-  ndn::shared_ptr<ndn::Data> data = ndn::make_shared<ndn::Data>();
-  data->setName(name);
-  data->setContent(&content[0], content.size());
-  keyChain.sign(*data);
-
-  return data;
-}
-
-class FixtureBase : public TestDbCreateDestroy
-{
-public:
-  typedef std::list<ndn::shared_ptr<ndn::Data> > DataContainer;
-  DataContainer datas;
-
-  typedef std::list<std::pair<ndn::Interest, ndn::shared_ptr<ndn::Data> > > InterestContainer;
-  InterestContainer interests;
-};
-
-template<size_t N>
-class BaseSmoketestFixture : public FixtureBase
-{
-public:
-  BaseSmoketestFixture()
-  {
-    ndn::Name baseName("/x/y/z/test/1");
-    for (size_t i = 0; i < N; i++) {
-      ndn::Name name(baseName);
-      name.appendSegment(i);
-      ndn::shared_ptr<Data> data = createData(name);
-      this->datas.push_back(data);
-
-      this->interests.push_back(std::make_pair(Interest(name), data));
-    }
-  }
-};
-
-class BaseTestFixture : public FixtureBase
-{
-public:
-  BaseTestFixture()
-  {
-    datas.push_back(createData("/a"));
-    interests.push_back(std::make_pair(Interest("/a"), datas.back()));
-
-    datas.push_back(createData("/a/b"));
-    interests.push_back(std::make_pair(Interest("/a/b"), datas.back()));
-
-    datas.push_back(createData("/a/b/c"));
-    interests.push_back(std::make_pair(Interest("/a/b/c"), datas.back()));
-
-    datas.push_back(createData("/a/b/c/d"));
-    interests.push_back(std::make_pair(Interest("/a/b/c/d"), datas.back()));
-  }
-};
-
-class SelectorTestFixture : public FixtureBase
-{
-public:
-  SelectorTestFixture()
-  {
-    datas.push_back(createData("/a/1"));
-    datas.push_back(createData("/b/1"));
-    interests.push_back(std::make_pair(Interest()
-                                         .setName("/b")
-                                         .setSelectors(Selectors()
-                                                         .setChildSelector(0)),
-                                       datas.back()));
-
-    datas.push_back(createData("/c/1"));
-    datas.push_back(createData("/b/99"));
-    interests.push_back(std::make_pair(Interest()
-                                         .setName("/b")
-                                         .setSelectors(Selectors()
-                                                         .setChildSelector(1)),
-                                       datas.back()));
-    datas.push_back(createData("/b/5"));
-    datas.push_back(createData("/b/55"));
-  }
-};
-
-typedef boost::mpl::vector< BaseTestFixture,
-                            SelectorTestFixture,
-                            BaseSmoketestFixture<1>,
-                            BaseSmoketestFixture<100> > Fixtures;
-
-BOOST_FIXTURE_TEST_CASE_TEMPLATE(InsertReadDelete, T, Fixtures, T)
-{
-  // Insert
-  for (typename T::DataContainer::iterator i = this->datas.begin();
-       i != this->datas.end(); ++i) {
-    BOOST_CHECK_EQUAL(this->handle->insertData(**i), true);
-  }
-
-  // Read (all items should exist)
-  for (typename T::InterestContainer::iterator i = this->interests.begin();
-       i != this->interests.end(); ++i) {
-    ndn::Data retrievedData;
-    BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), true);
-    BOOST_CHECK_EQUAL(retrievedData, *i->second);
-  }
-
-  // Delete
-  for (typename T::DataContainer::iterator i = this->datas.begin();
-       i != this->datas.end(); ++i) {
-    BOOST_CHECK_EQUAL(this->handle->deleteData((*i)->getName()), true);
-  }
-
-  // Read (none of the items should exist)
-  for (typename T::InterestContainer::iterator i = this->interests.begin();
-       i != this->interests.end(); ++i) {
-    ndn::Data retrievedData;
-    BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), false);
-  }
-}
-
-BOOST_AUTO_TEST_SUITE_END()
-
-}// namespace repo