src: Small source reorganization
Change-Id: I442f3ad0392ef4220e6435fc48e6d69acbd02076
diff --git a/src/common.hpp b/src/common.hpp
new file mode 100644
index 0000000..70d959f
--- /dev/null
+++ b/src/common.hpp
@@ -0,0 +1,72 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_COMMON_HPP
+#define REPO_COMMON_HPP
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/selectors.hpp>
+#include <ndn-cxx/key-locator.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/command-interest-validator.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+
+#include <boost/utility.hpp>
+#include <boost/random/mersenne_twister.hpp>
+#include <boost/random/uniform_int_distribution.hpp>
+
+#include <map>
+#include <string>
+#include <vector>
+#include <algorithm>
+
+namespace repo {
+
+using ndn::Face;
+using ndn::Block;
+using ndn::Name;
+using ndn::Interest;
+using ndn::Selectors;
+using ndn::Exclude;
+using ndn::Data;
+using ndn::KeyLocator;
+using ndn::KeyChain;
+using ndn::CommandInterestValidator;
+using ndn::Scheduler;
+
+using ndn::bind;
+using ndn::shared_ptr;
+using ndn::make_shared;
+using ndn::enable_shared_from_this;
+
+using std::vector;
+using std::string;
+
+using boost::noncopyable;
+
+typedef uint64_t ProcessId;
+typedef uint64_t SegmentNo;
+
+} // namespace repo
+
+#endif // REPO_COMMON_HPP
diff --git a/src/handles/base-handle.cpp b/src/handles/base-handle.cpp
new file mode 100644
index 0000000..76fa36c
--- /dev/null
+++ b/src/handles/base-handle.cpp
@@ -0,0 +1,32 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "base-handle.hpp"
+
+namespace repo {
+
+uint64_t
+BaseHandle::generateProcessId()
+{
+ static boost::random::mt19937_64 gen;
+ static boost::random::uniform_int_distribution<uint64_t> dist(0, 0xFFFFFFFFFFFFFFFFLL);
+ return dist(gen);
+}
+
+}
diff --git a/src/handles/base-handle.hpp b/src/handles/base-handle.hpp
new file mode 100644
index 0000000..3223940
--- /dev/null
+++ b/src/handles/base-handle.hpp
@@ -0,0 +1,120 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_BASE_HANDLE_HPP
+#define REPO_HANDLES_BASE_HANDLE_HPP
+
+#include "common.hpp"
+
+#include "storage/storage-handle.hpp"
+#include "repo-command-response.hpp"
+#include "repo-command-parameter.hpp"
+
+namespace repo {
+
+class BaseHandle : noncopyable
+{
+
+public:
+ class Error : std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ BaseHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain, Scheduler& scheduler)
+ : m_face(face)
+ , m_storageHandle(storageHandle)
+ , m_keyChain(keyChain)
+ , m_scheduler(scheduler)
+ {
+ }
+
+ virtual void
+ listen(const Name& prefix) = 0;
+
+protected:
+
+ inline Face&
+ getFace()
+ {
+ return m_face;
+ }
+
+ inline StorageHandle&
+ getStorageHandle()
+ {
+ return m_storageHandle;
+ }
+
+ inline Scheduler&
+ getScheduler()
+ {
+ return m_scheduler;
+ }
+
+ uint64_t
+ generateProcessId();
+
+ void
+ reply(const Interest& commandInterest, const RepoCommandResponse& response);
+
+
+ /**
+ * @brief extract RepoCommandParameter from a command Interest.
+ * @param interest command Interest
+ * @param prefix Name prefix up to command-verb
+ * @param[out] parameter parsed parameter
+ * @throw RepoCommandParameter::Error parse error
+ */
+ void
+ extractParameter(const Interest& interest, const Name& prefix, RepoCommandParameter& parameter);
+
+private:
+
+ Face& m_face;
+ StorageHandle& m_storageHandle;
+ KeyChain& m_keyChain;
+ Scheduler& m_scheduler;
+};
+
+inline void
+BaseHandle::reply(const Interest& commandInterest, const RepoCommandResponse& response)
+{
+ Data rdata(commandInterest.getName());
+ rdata.setContent(response.wireEncode());
+ m_keyChain.sign(rdata);
+ m_face.put(rdata);
+}
+
+inline void
+BaseHandle::extractParameter(const Interest& interest, const Name& prefix,
+ RepoCommandParameter& parameter)
+{
+ parameter.wireDecode(interest.getName().get(prefix.size()).blockFromValue());
+}
+
+} // namespace repo
+
+#endif // REPO_HANDLES_BASE_HANDLE_HPP
diff --git a/src/handles/delete-handle.cpp b/src/handles/delete-handle.cpp
new file mode 100644
index 0000000..49e4d98
--- /dev/null
+++ b/src/handles/delete-handle.cpp
@@ -0,0 +1,197 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "delete-handle.hpp"
+
+namespace repo {
+
+DeleteHandle::DeleteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ , m_validator(validator)
+{
+}
+
+void
+DeleteHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+ //std::cout << "call DeleteHandle" << std::endl;
+ m_validator.validate(interest, bind(&DeleteHandle::onValidated, this, _1, prefix),
+ bind(&DeleteHandle::onValidationFailed, this, _1, prefix));
+}
+
+
+void
+DeleteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Delete prefix registration failed");
+}
+
+
+void
+DeleteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+{
+ BOOST_ASSERT(false); // Deletion progress check, not implemented
+}
+
+
+void
+DeleteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Delete check prefix registration failed");
+}
+
+
+void
+DeleteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ RepoCommandParameter parameter;
+
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (parameter.hasSelectors()) {
+
+ if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
+ negativeReply(*interest, 402);
+ return;
+ }
+
+ //choose data with selector and delete it
+ processSelectorDeleteCommand(*interest, parameter);
+ return;
+ }
+
+ if (!parameter.hasStartBlockId() && !parameter.hasEndBlockId()) {
+ processSingleDeleteCommand(*interest, parameter);
+ return;
+ }
+
+ processSegmentDeleteCommand(*interest, parameter);
+}
+
+void
+DeleteHandle::onValidationFailed(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ std::cout << "invalidated" << std::endl;
+ negativeReply(*interest, 401);
+}
+
+void
+DeleteHandle::listen(const Name& prefix)
+{
+ getFace().setInterestFilter(Name(prefix).append("delete"),
+ bind(&DeleteHandle::onInterest, this, _1, _2),
+ bind(&DeleteHandle::onRegisterFailed, this, _1, _2));
+}
+
+void
+DeleteHandle::positiveReply(const Interest& interest, const RepoCommandParameter& parameter,
+ uint64_t statusCode, uint64_t nDeletedDatas)
+{
+ RepoCommandResponse response;
+ if (parameter.hasProcessId()) {
+ response.setProcessId(parameter.getProcessId());
+ response.setStatusCode(statusCode);
+ response.setDeleteNum(nDeletedDatas);
+ }
+ else {
+ response.setStatusCode(403);
+ }
+ reply(interest, response);
+}
+
+void
+DeleteHandle::negativeReply(const Interest& interest, uint64_t statusCode)
+{
+ RepoCommandResponse response;
+ response.setStatusCode(statusCode);
+ reply(interest, response);
+}
+
+void
+DeleteHandle::processSingleDeleteCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ uint64_t nDeletedDatas = 0;
+ if (getStorageHandle().deleteData(parameter.getName())) {
+ nDeletedDatas++;
+ }
+ positiveReply(interest, parameter, 200, nDeletedDatas);
+}
+
+void
+DeleteHandle::processSelectorDeleteCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ uint64_t nDeletedDatas = 0;
+ Name name = parameter.getName();
+ Selectors selectors = parameter.getSelectors();
+ vector<Name> names;
+ getStorageHandle().readNameAny(name, selectors, names);
+
+ for (vector<Name>::iterator it = names.begin(); it != names.end(); ++it) {
+ if (getStorageHandle().deleteData(*it)) {
+ nDeletedDatas++;
+ }
+ }
+
+ //All data has been deleted, return 200
+ positiveReply(interest, parameter, 200, nDeletedDatas);
+}
+
+void
+DeleteHandle::processSegmentDeleteCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ if (!parameter.hasStartBlockId())
+ parameter.setStartBlockId(0);
+
+ if (parameter.hasEndBlockId()) {
+ SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo endBlockId = parameter.getEndBlockId();
+
+ if (startBlockId > endBlockId) {
+ negativeReply(interest, 403);
+ return;
+ }
+
+ Name prefix = parameter.getName();
+ uint64_t nDeletedDatas = 0;
+ for (SegmentNo i = startBlockId; i <= endBlockId; i++) {
+ Name name = prefix;
+ name.appendSegment(i);
+ if (getStorageHandle().deleteData(name)) {
+ nDeletedDatas++;
+ }
+ }
+ //All the data deleted, return 200
+ positiveReply(interest, parameter, 200, nDeletedDatas);
+ }
+ else {
+ BOOST_ASSERT(false); // segmented deletion without EndBlockId, not implemented
+ }
+}
+
+} //namespace repo
diff --git a/src/handles/delete-handle.hpp b/src/handles/delete-handle.hpp
new file mode 100644
index 0000000..680e8dc
--- /dev/null
+++ b/src/handles/delete-handle.hpp
@@ -0,0 +1,95 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_DELETE_HANDLE_HPP
+#define REPO_HANDLES_DELETE_HANDLE_HPP
+
+#include "base-handle.hpp"
+
+namespace repo {
+
+using std::vector;
+
+class DeleteHandle : public BaseHandle
+{
+
+public:
+ class Error : public BaseHandle::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : BaseHandle::Error(what)
+ {
+ }
+ };
+
+public:
+ DeleteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator);
+
+ virtual void
+ listen(const Name& prefix);
+
+private:
+ void
+ onInterest(const Name& prefix, const Interest& interest);
+
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+
+ void
+ onValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ void
+ onValidationFailed(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ /**
+ * @todo delete check has not been realized due to the while loop of segmented data deletion.
+ */
+ void
+ onCheckInterest(const Name& prefix, const Interest& interest);
+
+ void
+ onCheckRegisterFailed(const Name& prefix, const std::string& reason);
+
+ void
+ positiveReply(const Interest& interest, const RepoCommandParameter& parameter,
+ uint64_t statusCode, uint64_t nDeletedDatas);
+
+ void
+ negativeReply(const Interest& interest, uint64_t statusCode);
+
+ void
+ processSingleDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+ void
+ processSelectorDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+ void
+ processSegmentDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+private:
+ CommandInterestValidator& m_validator;
+
+};
+
+} // namespace repo
+
+#endif // REPO_HANDLES_DELETE_HANDLE_HPP
diff --git a/src/handles/read-handle.cpp b/src/handles/read-handle.cpp
new file mode 100644
index 0000000..d0e80b3
--- /dev/null
+++ b/src/handles/read-handle.cpp
@@ -0,0 +1,48 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "read-handle.hpp"
+
+namespace repo {
+
+void
+ReadHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+ Data data;
+ if (getStorageHandle().readData(interest, data)) {
+ getFace().put(data);
+ }
+}
+
+void
+ReadHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ std::cerr << "ERROR: Failed to register prefix in local hub's daemon" << std::endl;
+ getFace().shutdown();
+}
+
+void
+ReadHandle::listen(const Name& prefix)
+{
+ getFace().setInterestFilter(prefix,
+ bind(&ReadHandle::onInterest, this, _1, _2),
+ bind(&ReadHandle::onRegisterFailed, this, _1, _2));
+}
+
+} //namespace repo
diff --git a/src/handles/read-handle.hpp b/src/handles/read-handle.hpp
new file mode 100644
index 0000000..93468c0
--- /dev/null
+++ b/src/handles/read-handle.hpp
@@ -0,0 +1,52 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_READ_HANDLE_HPP
+#define REPO_HANDLES_READ_HANDLE_HPP
+
+#include "base-handle.hpp"
+
+namespace repo {
+
+class ReadHandle : public BaseHandle
+{
+
+public:
+ ReadHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain, Scheduler& scheduler)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ {
+ }
+
+ virtual void
+ listen(const Name& prefix);
+
+private:
+ /**
+ * @brief Read data from backend storage
+ */
+ void
+ onInterest(const Name& prefix, const Interest& interest);
+
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+};
+
+} // namespace repo
+
+#endif // REPO_HANDLES_READ_HANDLE_HPP
diff --git a/src/handles/tcp-bulk-insert-handle.cpp b/src/handles/tcp-bulk-insert-handle.cpp
new file mode 100644
index 0000000..a69dfe2
--- /dev/null
+++ b/src/handles/tcp-bulk-insert-handle.cpp
@@ -0,0 +1,218 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tcp-bulk-insert-handle.hpp"
+
+namespace repo {
+
+const size_t MAX_NDN_PACKET_SIZE = 8800;
+
+namespace detail {
+
+class TcpBulkInsertClient : noncopyable
+{
+public:
+ TcpBulkInsertClient(TcpBulkInsertHandle& writer,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ : m_writer(writer)
+ , m_socket(socket)
+ , m_hasStarted(false)
+ , m_inputBufferSize(0)
+ {
+ }
+
+ static void
+ startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+ {
+ BOOST_ASSERT(!client->m_hasStarted);
+
+ client->m_socket->async_receive(
+ boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+ bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+
+ client->m_hasStarted = true;
+ }
+
+private:
+ void
+ handleReceive(const boost::system::error_code& error,
+ std::size_t nBytesReceived,
+ const shared_ptr<TcpBulkInsertClient>& client);
+
+private:
+ TcpBulkInsertHandle& m_writer;
+ shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+ bool m_hasStarted;
+ uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+ std::size_t m_inputBufferSize;
+};
+
+} // namespace detail
+
+TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
+ StorageHandle& storageHandle)
+ : m_acceptor(ioService)
+ , m_storageHandle(storageHandle)
+{
+}
+
+void
+TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
+{
+ using namespace boost::asio;
+
+ ip::tcp::resolver resolver(m_acceptor.get_io_service());
+ ip::tcp::resolver::query query(host, port);
+
+ ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
+ ip::tcp::resolver::iterator end;
+
+ if (endpoint == end)
+ throw Error("Cannot listen on [" + host + ":" + port + "]");
+
+ m_localEndpoint = *endpoint;
+ std::cerr << "Start listening on " << m_localEndpoint << std::endl;
+
+ m_acceptor.open(m_localEndpoint .protocol());
+ m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
+ if (m_localEndpoint.address().is_v6())
+ {
+ m_acceptor.set_option(ip::v6_only(true));
+ }
+ m_acceptor.bind(m_localEndpoint);
+ m_acceptor.listen(255);
+
+ shared_ptr<ip::tcp::socket> clientSocket =
+ make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+ m_acceptor.async_accept(*clientSocket,
+ bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+ clientSocket));
+}
+
+void
+TcpBulkInsertHandle::stop()
+{
+ m_acceptor.cancel();
+ m_acceptor.close();
+}
+
+void
+TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+{
+ using namespace boost::asio;
+
+ if (error) {
+ // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ // return;
+ return;
+ }
+
+ std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+
+ shared_ptr<detail::TcpBulkInsertClient> client =
+ make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
+ detail::TcpBulkInsertClient::startReceive(client);
+
+ // prepare accepting the next connection
+ shared_ptr<ip::tcp::socket> clientSocket =
+ make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
+ m_acceptor.async_accept(*clientSocket,
+ bind(&TcpBulkInsertHandle::handleAccept, this, _1,
+ clientSocket));
+}
+
+void
+detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
+ std::size_t nBytesReceived,
+ const shared_ptr<detail::TcpBulkInsertClient>& client)
+{
+ if (error)
+ {
+ if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ return;
+
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
+
+ m_inputBufferSize += nBytesReceived;
+
+ // do magic
+
+ std::size_t offset = 0;
+
+ bool isOk = true;
+ Block element;
+ while (m_inputBufferSize - offset > 0)
+ {
+ isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
+ if (!isOk)
+ break;
+
+ offset += element.size();
+ BOOST_ASSERT(offset <= m_inputBufferSize);
+
+ if (element.type() == ndn::Tlv::Data)
+ {
+ try {
+ Data data(element);
+ bool isOk = m_writer.getStorageHandle().insertData(data);
+ if (isOk)
+ std::cerr << "Successfully injected " << data.getName() << std::endl;
+ else
+ std::cerr << "FAILED to inject " << data.getName() << std::endl;
+ }
+ catch (std::runtime_error& error) {
+ /// \todo Catch specific error after determining what wireDecode() can throw
+ std::cerr << "Error decoding received Data packet" << std::endl;
+ }
+ }
+ }
+ if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
+ {
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
+
+ if (offset > 0)
+ {
+ if (offset != m_inputBufferSize)
+ {
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
+ m_inputBuffer);
+ m_inputBufferSize -= offset;
+ }
+ else
+ {
+ m_inputBufferSize = 0;
+ }
+ }
+
+ m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+ MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+ bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+}
+
+
+} // namespace repo
diff --git a/src/handles/tcp-bulk-insert-handle.hpp b/src/handles/tcp-bulk-insert-handle.hpp
new file mode 100644
index 0000000..17233f4
--- /dev/null
+++ b/src/handles/tcp-bulk-insert-handle.hpp
@@ -0,0 +1,72 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_TCP_BULK_INSERT_HANDLE_HPP
+#define REPO_HANDLES_TCP_BULK_INSERT_HANDLE_HPP
+
+#include "common.hpp"
+#include "storage/storage-handle.hpp"
+
+#include <boost/asio.hpp>
+
+namespace repo {
+
+class TcpBulkInsertHandle : noncopyable
+{
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ TcpBulkInsertHandle(boost::asio::io_service& ioService,
+ StorageHandle& storageHandle);
+
+ void
+ listen(const std::string& host, const std::string& port);
+
+ void
+ stop();
+
+ StorageHandle&
+ getStorageHandle()
+ {
+ return m_storageHandle;
+ }
+
+private:
+ void
+ handleAccept(const boost::system::error_code& error,
+ const shared_ptr<boost::asio::ip::tcp::socket>& socket);
+
+private:
+ boost::asio::ip::tcp::acceptor m_acceptor;
+ boost::asio::ip::tcp::endpoint m_localEndpoint;
+ StorageHandle& m_storageHandle;
+};
+
+} // namespace repo
+
+#endif // REPO_HANDLES_TCP_BULK_INSERT_HANDLE_HPP
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
new file mode 100644
index 0000000..0ea17ae
--- /dev/null
+++ b/src/handles/write-handle.cpp
@@ -0,0 +1,546 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "write-handle.hpp"
+
+namespace repo {
+
+static const int RETRY_TIMEOUT = 3;
+static const int DEFAULT_CREDIT = 12;
+static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
+static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
+
+WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ , m_validator(validator)
+ , m_retryTime(RETRY_TIMEOUT)
+ , m_credit(DEFAULT_CREDIT)
+ , m_noEndTimeout(NOEND_TIMEOUT)
+{
+}
+
+void
+WriteHandle::deleteProcess(ProcessId processId)
+{
+ m_processes.erase(processId);
+}
+
+// Interest.
+void
+WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onValidated, this, _1, prefix),
+ bind(&WriteHandle::onValidationFailed, this, _1));
+}
+
+// onRegisterFailed.
+void
+WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert prefix registration failed");
+}
+
+// onRegisterFailed for insert.
+void
+WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert check prefix registration failed");
+}
+
+void
+WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ //m_validResult = 1;
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
+ if (parameter.hasSelectors()) {
+ negativeReply(*interest, 402);
+ return;
+ }
+ processSegmentedInsertCommand(*interest, parameter);
+ }
+ else {
+ processSingleInsertCommand(*interest, parameter);
+ }
+
+}
+
+void
+WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ std::cout << "invalidated" << std::endl;
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
+{
+ //std::cout << "onData" << std::endl;
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+
+ if (response.getInsertNum() == 0) {
+ getStorageHandle().insertData(data);
+ response.setInsertNum(1);
+ }
+
+ deferredDeleteProcess(processId);
+}
+
+void
+WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
+{
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ //retrieve the process from the responsemap
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ RepoCommandResponse& response = m_processes[processId].response;
+
+ //refresh endBlockId
+ Name::Component finalBlockId = data.getFinalBlockId();
+
+ if (!finalBlockId.empty()) {
+ SegmentNo final = finalBlockId.toSegment();
+ if (response.hasEndBlockId()) {
+ if (final < response.getEndBlockId()) {
+ response.setEndBlockId(final);
+ }
+ }
+ else {
+ response.setEndBlockId(final);
+ }
+ }
+
+ //insert data
+ //std::cout << "start to insert" << std::endl;
+ if (getStorageHandle().insertData(data)) {
+ response.setInsertNum(response.getInsertNum() + 1);
+ }
+ //std::cout << "end of insert" << std::endl;
+
+ //it->second = response;
+
+ onSegmentDataControl(processId, interest);
+}
+
+void
+WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
+{
+ std::cout << "Timeout" << std::endl;
+ m_processes.erase(processId);
+}
+
+void
+WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
+{
+ std::cout << "SegTimeout" << std::endl;
+
+ onSegmentTimeoutControl(processId, interest);
+}
+
+void
+WriteHandle::listen(const Name& prefix)
+{
+ Name insertPrefix;
+ insertPrefix.append(prefix).append("insert");
+ getFace().setInterestFilter(insertPrefix,
+ bind(&WriteHandle::onInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+ Name insertCheckPrefix;
+ insertCheckPrefix.append(prefix).append("insert check");
+ getFace().setInterestFilter(insertCheckPrefix,
+ bind(&WriteHandle::onCheckInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+ ProcessInfo& process = m_processes[processId];
+ process.credit = 0;
+
+ map<SegmentNo, int>& processRetry = process.retryCounts;
+
+ Name name = parameter.getName();
+ SegmentNo startBlockId = parameter.getStartBlockId();
+
+ uint64_t initialCredit = m_credit;
+
+ if (parameter.hasEndBlockId()) {
+ initialCredit =
+ std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
+ }
+ else {
+ // set noEndTimeout timer
+ process.noEndTime = ndn::time::steady_clock::now() +
+ m_noEndTimeout;
+ }
+ process.credit = initialCredit;
+ SegmentNo segment = startBlockId;
+ for (; segment < startBlockId + initialCredit; ++segment) {
+ Name fetchName = name;
+ fetchName.appendSegment(segment);
+ Interest interest(fetchName);
+ //std::cout << "seg:" << j<<std::endl;
+ getFace().expressInterest(interest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ process.credit--;
+ processRetry[segment] = 0;
+ }
+
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+
+ process.nextSegment = segment;
+ nextSegmentQueue.push(segment);
+}
+
+void
+WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
+{
+ //std::cout << "onSegmentDataControl: " << processId << std::endl;
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+ int& processCredit = process.credit;
+ //onSegmentDataControl is called when a data returns.
+ //When data returns, processCredit++
+ processCredit++;
+ SegmentNo& nextSegment = process.nextSegment;
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ //read whether notime timeout
+ if (!response.hasEndBlockId()) {
+
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+ if (now > noEndTime) {
+ std::cout << "noEndtimeout: " << processId << std::endl;
+ //m_processes.erase(processId);
+ //StatusCode should be refreshed as 405
+ response.setStatusCode(405);
+ //schedule a delete event
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //read whether this process has total ends, if ends, remove control info from the maps
+ if (response.hasEndBlockId()) {
+ uint64_t nSegments =
+ response.getEndBlockId() - response.getStartBlockId() + 1;
+ if (response.getInsertNum() >= nSegments) {
+ //m_processes.erase(processId);
+ //All the data has been inserted, StatusCode is refreshed as 200
+ response.setStatusCode(200);
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //check whether there is any credit
+ if (processCredit == 0)
+ return;
+
+
+ //check whether sent queue empty
+ if (nextSegmentQueue.empty()) {
+ //do not do anything
+ return;
+ }
+
+ //pop the queue
+ SegmentNo sendingSegment = nextSegmentQueue.front();
+ nextSegmentQueue.pop();
+
+ //check whether sendingSegment exceeds
+ if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
+ //do not do anything
+ return;
+ }
+
+ //read whether this is retransmitted data;
+ SegmentNo fetchedSegment =
+ interest.getName().get(interest.getName().size() - 1).toSegment();
+
+ BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
+
+ //find this fetched data, remove it from this map
+ //rit->second.erase(oit);
+ retryCounts.erase(fetchedSegment);
+ //express the interest of the top of the queue
+ Name fetchName(interest.getName().getPrefix(-1));
+ fetchName.appendSegment(sendingSegment);
+ Interest fetchInterest(fetchName);
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ //When an interest is expressed, processCredit--
+ processCredit--;
+ //std::cout << "sent seg: " << sendingSegment << std::endl;
+ if (retryCounts.count(sendingSegment) == 0) {
+ //not found
+ retryCounts[sendingSegment] = 0;
+ }
+ else {
+ //found
+ retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
+ }
+ //increase the next seg and put it into the queue
+ if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
+ nextSegment++;
+ nextSegmentQueue.push(nextSegment);
+ }
+}
+
+void
+WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
+{
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ // RepoCommandResponse& response = process.response;
+ // SegmentNo& nextSegment = process.nextSegment;
+ // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
+
+ std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
+
+ BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
+
+ //read the retry time. If retry out of time, fail the process. if not, plus
+ int& retryTime = retryCounts[timeoutSegment];
+ if (retryTime >= m_retryTime) {
+ //fail this process
+ std::cout << "Retry timeout: " << processId << std::endl;
+ m_processes.erase(processId);
+ return;
+ }
+ else {
+ //Reput it in the queue, retryTime++
+ retryTime++;
+ Interest retryInterest(interest.getName());
+ getFace().expressInterest(retryInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ }
+
+}
+
+void
+WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onCheckValidated, this, _1, prefix),
+ bind(&WriteHandle::onCheckValidationFailed, this, _1));
+
+}
+
+void
+WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (!parameter.hasProcessId()) {
+ negativeReply(*interest, 403);
+ return;
+ }
+ //check whether this process exists
+ ProcessId processId = parameter.getProcessId();
+ if (m_processes.count(processId) == 0) {
+ std::cout << "no such processId: " << processId << std::endl;
+ negativeReply(*interest, 404);
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+
+ //Check whether it is single data fetching
+ if (!response.hasStartBlockId() &&
+ !response.hasEndBlockId()) {
+ reply(*interest, response);
+ return;
+ }
+
+ //read if noEndtimeout
+ if (!response.hasEndBlockId()) {
+ extendNoEndTime(process);
+ reply(*interest, response);
+ return;
+ }
+ else {
+ reply(*interest, response);
+ }
+}
+
+void
+WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+ getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ ndn::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
+WriteHandle::processSingleInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ ProcessId processId = generateProcessId();
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+
+ reply(interest, response);
+
+ response.setStatusCode(300);
+
+ Interest fetchInterest(parameter.getName());
+ if (parameter.hasSelectors()) {
+ fetchInterest.setSelectors(parameter.getSelectors());
+ }
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onData, this, _1, _2, processId),
+ bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::processSegmentedInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ if (parameter.hasEndBlockId()) {
+ //normal fetch segment
+ if (!parameter.hasStartBlockId()) {
+ parameter.setStartBlockId(0);
+ }
+
+ SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo endBlockId = parameter.getEndBlockId();
+ //std::cout << "startBlockId: " << startBlockId << std::endl;
+ //std::cout << "endBlockId: " << endBlockId << std::endl;
+ if (startBlockId > endBlockId) {
+ negativeReply(interest, 403);
+ return;
+ }
+
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(startBlockId);
+ response.setEndBlockId(endBlockId);
+
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+ else {
+ //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(parameter.getStartBlockId());
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+}
+
+void
+WriteHandle::extendNoEndTime(ProcessInfo& process)
+{
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+ RepoCommandResponse& response = process.response;
+ if (now > noEndTime) {
+ response.setStatusCode(405);
+ return;
+ }
+ //extends noEndTime
+ process.noEndTime =
+ ndn::time::steady_clock::now() + m_noEndTimeout;
+
+}
+
+void
+WriteHandle::negativeReply(const Interest& interest, int statusCode)
+{
+ RepoCommandResponse response;
+ response.setStatusCode(statusCode);
+ reply(interest, response);
+}
+
+} //namespace repo
diff --git a/src/handles/write-handle.hpp b/src/handles/write-handle.hpp
new file mode 100644
index 0000000..075e78c
--- /dev/null
+++ b/src/handles/write-handle.hpp
@@ -0,0 +1,225 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_WRITE_HANDLE_HPP
+#define REPO_HANDLES_WRITE_HANDLE_HPP
+
+#include "base-handle.hpp"
+
+#include <queue>
+
+namespace repo {
+
+using std::map;
+using std::pair;
+using std::queue;
+
+/**
+ * @brief WriteHandle provides basic credit based congestion control.
+ *
+ * First repo sends interests of credit number and then credit will be 0.
+ *
+ * If a data comes, credit++ and sends a interest then credit--.
+ *
+ * If the interest timeout, repo will retry and send interest in retrytimes.
+ *
+ * If one interest timeout beyond retrytimes, the fetching process will terminate.
+ *
+ * Another case is that if command will insert segmented data without EndBlockId.
+ *
+ * The repo will keep fetching data in noendTimeout time.
+ *
+ * If data returns with FinalBlockId, this detecting timeout process will terminate.
+ *
+ * If client sends a insert check command, the noendTimeout timer will be set to 0.
+ *
+ * If repo cannot get FinalBlockId in noendTimeout time, the fetching process will terminate.
+ */
+class WriteHandle : public BaseHandle
+{
+
+public:
+ class Error : public BaseHandle::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : BaseHandle::Error(what)
+ {
+ }
+ };
+
+
+public:
+ WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator);
+
+ virtual void
+ listen(const Name& prefix);
+
+private:
+ /**
+ * @brief Information of insert process including variables for response
+ * and credit based flow control
+ */
+ struct ProcessInfo
+ {
+ //ProcessId id;
+ RepoCommandResponse response;
+ queue<SegmentNo> nextSegmentQueue; ///< queue of waiting segment
+ /// to be sent when having credits
+ SegmentNo nextSegment; ///< last segment put into the nextSegmentQueue
+ map<SegmentNo, int> retryCounts; ///< to store retrying times of timeout segment
+ int credit; ///< congestion control credits of process
+
+ /**
+ * @brief the latest time point at which EndBlockId must be determined
+ *
+ * Segmented fetch process will terminate if EndBlockId cannot be
+ * determined before this time point.
+ * It is initialized to now()+noEndTimeout when segmented fetch process begins,
+ * and reset to now()+noEndTimeout each time an insert status check command is processed.
+ */
+ ndn::time::steady_clock::TimePoint noEndTime;
+ };
+
+private: // insert command
+ /**
+ * @brief handle insert commands
+ */
+ void
+ onInterest(const Name& prefix, const Interest& interest);
+
+ void
+ onValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ void
+ onValidationFailed(const shared_ptr<const Interest>& interest);
+
+ /**
+ * @brief insert command prefix register failed
+ */
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+
+private: // single data fetching
+ /**
+ * @brief fetch one data
+ */
+ void
+ onData(const Interest& interest, Data& data, ProcessId processId);
+
+ /**
+ * @brief handle when fetching one data timeout
+ */
+ void
+ onTimeout(const Interest& interest, ProcessId processId);
+
+ void
+ processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+private: // segmented data fetching
+ /**
+ * @brief fetch segmented data
+ */
+ void
+ onSegmentData(const Interest& interest, Data& data, ProcessId processId);
+
+ /**
+ * @brief Timeout when fetching segmented data. Data can be fetched RETRY_TIMEOUT times.
+ */
+ void
+ onSegmentTimeout(const Interest& interest, ProcessId processId);
+
+ /**
+ * @brief initiate fetching segmented data
+ */
+ void
+ segInit(ProcessId processId, const RepoCommandParameter& parameter);
+
+ /**
+ * @brief control for sending interests in function onSegmentData()
+ */
+ void
+ onSegmentDataControl(ProcessId processId, const Interest& interest);
+
+ /**
+ * @brief control for sending interest in function onSegmentTimeout
+ */
+ void
+ onSegmentTimeoutControl(ProcessId processId, const Interest& interest);
+
+ void
+ processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+ /**
+ * @brief extends noEndTime of process if not noEndTimeout, set StatusCode 405
+ *
+ * called by onCheckValidated() if there is no EndBlockId. If not noEndTimeout,
+ * extends noEndTime of process. If noEndTimeout, set status code 405 as noEndTimeout.
+ */
+ void
+ extendNoEndTime(ProcessInfo& process);
+
+private: // insert state check command
+ /**
+ * @brief handle insert check command
+ */
+ void
+ onCheckInterest(const Name& prefix, const Interest& interest);
+
+ /**
+ * @brief insert check command prefix register failed
+ */
+ void
+ onCheckRegisterFailed(const Name& prefix, const std::string& reason);
+
+ void
+ onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ void
+ onCheckValidationFailed(const shared_ptr<const Interest>& interest);
+
+private:
+ void
+ deleteProcess(ProcessId processId);
+
+ /**
+ * @brief schedule a event to delete the process
+ */
+ void
+ deferredDeleteProcess(ProcessId processId);
+
+ void
+ negativeReply(const Interest& interest, int statusCode);
+
+private:
+
+ CommandInterestValidator& m_validator;
+
+ map<ProcessId, ProcessInfo> m_processes;
+
+ int m_retryTime;
+ int m_credit;
+ ndn::time::milliseconds m_noEndTimeout;
+};
+
+} // namespace repo
+
+#endif // REPO_HANDLES_WRITE_HANDLE_HPP
diff --git a/src/main.cpp b/src/main.cpp
new file mode 100644
index 0000000..e69c32d
--- /dev/null
+++ b/src/main.cpp
@@ -0,0 +1,95 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.hpp"
+#include "repo.hpp"
+
+using namespace repo;
+
+static const string ndnRepoUsageMessage =
+ /* argv[0] */ " - Next generation of NDN repository\n"
+ "-h: show help message\n"
+ "-c: set config file path\n"
+ ;
+
+void
+terminate(boost::asio::io_service& ioService,
+ const boost::system::error_code& error,
+ int signalNo,
+ boost::asio::signal_set& signalSet)
+{
+ if (error)
+ return;
+
+ if (signalNo == SIGINT ||
+ signalNo == SIGTERM)
+ {
+ ioService.stop();
+ std::cout << "Caught signal '" << strsignal(signalNo) << "', exiting..." << std::endl;
+ }
+ else
+ {
+ /// \todo May be try to reload config file
+ signalSet.async_wait(bind(&terminate, boost::ref(ioService), _1, _2,
+ boost::ref(signalSet)));
+ }
+}
+
+int
+main(int argc, char** argv)
+{
+ string configPath = DEFAULT_CONFIG_FILE;
+ int opt;
+ while ((opt = getopt(argc, argv, "hc:")) != -1) {
+ switch (opt) {
+ case 'h':
+ std::cout << argv[0] << ndnRepoUsageMessage << std::endl;
+ return 1;
+ case 'c':
+ configPath = string(optarg);
+ break;
+ default:
+ break;
+ }
+ }
+
+ try {
+ boost::asio::io_service ioService;
+ Repo repoInstance(ioService, parseConfig(configPath));
+
+ boost::asio::signal_set signalSet(ioService);
+ signalSet.add(SIGINT);
+ signalSet.add(SIGTERM);
+ signalSet.add(SIGHUP);
+ signalSet.add(SIGUSR1);
+ signalSet.add(SIGUSR2);
+ signalSet.async_wait(bind(&terminate, boost::ref(ioService), _1, _2,
+ boost::ref(signalSet)));
+
+ repoInstance.enableListening();
+
+ ioService.run();
+ }
+ catch (const std::exception& e) {
+ std::cerr << "ERROR: " << e.what() << std::endl;
+ return 2;
+ }
+
+ return 0;
+}
diff --git a/src/repo-command-parameter.hpp b/src/repo-command-parameter.hpp
new file mode 100644
index 0000000..8a8c4bb
--- /dev/null
+++ b/src/repo-command-parameter.hpp
@@ -0,0 +1,344 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_REPO_COMMAND_PARAMETER_HPP
+#define REPO_REPO_COMMAND_PARAMETER_HPP
+
+#include <ndn-cxx/encoding/encoding-buffer.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/selectors.hpp>
+#include "repo-tlv.hpp"
+
+namespace repo {
+
+using ndn::Name;
+using ndn::Block;
+using ndn::EncodingImpl;
+using ndn::Selectors;
+using ndn::EncodingEstimator;
+using ndn::EncodingBuffer;
+
+/**
+* @brief Class defining abstraction of parameter of command for NDN Repo Protocol
+* @sa link http://redmine.named-data.net/projects/repo-ng/wiki/Repo_Protocol_Specification#RepoCommandParameter
+**/
+
+class RepoCommandParameter
+{
+public:
+ class Error : public ndn::Tlv::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : ndn::Tlv::Error(what)
+ {
+ }
+ };
+
+ RepoCommandParameter()
+ : m_hasName(false)
+ , m_hasStartBlockId(false)
+ , m_hasEndBlockId(false)
+ , m_hasProcessId(false)
+ {
+ }
+
+ explicit
+ RepoCommandParameter(const Block& block)
+ {
+ wireDecode(block);
+ }
+
+ const Name&
+ getName() const
+ {
+ return m_name;
+ }
+
+ RepoCommandParameter&
+ setName(const Name& name)
+ {
+ m_name = name;
+ m_hasName = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasName() const
+ {
+ return m_hasName;
+ }
+
+ const Selectors&
+ getSelectors() const
+ {
+ return m_selectors;
+ }
+
+ RepoCommandParameter&
+ setSelectors(const Selectors& selectors)
+ {
+ m_selectors = selectors;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasSelectors() const
+ {
+ return !m_selectors.empty();
+ }
+
+ uint64_t
+ getStartBlockId() const
+ {
+ assert(hasStartBlockId());
+ return m_startBlockId;
+ }
+
+ RepoCommandParameter&
+ setStartBlockId(uint64_t startBlockId)
+ {
+ m_startBlockId = startBlockId;
+ m_hasStartBlockId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasStartBlockId() const
+ {
+ return m_hasStartBlockId;
+ }
+
+ uint64_t
+ getEndBlockId() const
+ {
+ assert(hasEndBlockId());
+ return m_endBlockId;
+ }
+
+ RepoCommandParameter&
+ setEndBlockId(uint64_t endBlockId)
+ {
+ m_endBlockId = endBlockId;
+ m_hasEndBlockId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasEndBlockId() const
+ {
+ return m_hasEndBlockId;
+ }
+
+ uint64_t
+ getProcessId() const
+ {
+ assert(hasProcessId());
+ return m_processId;
+ }
+
+ bool
+ hasProcessId() const
+ {
+ return m_hasProcessId;
+ }
+
+ RepoCommandParameter&
+ setProcessId(uint64_t processId)
+ {
+ m_processId = processId;
+ m_hasProcessId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ template<bool T>
+ size_t
+ wireEncode(EncodingImpl<T>& block) const;
+
+ const Block&
+ wireEncode() const;
+
+ void
+ wireDecode(const Block& wire);
+
+private:
+
+ Name m_name;
+ Selectors m_selectors;
+ uint64_t m_startBlockId;
+ uint64_t m_endBlockId;
+ uint64_t m_processId;
+
+ bool m_hasName;
+ bool m_hasStartBlockId;
+ bool m_hasEndBlockId;
+ bool m_hasProcessId;
+
+ mutable Block m_wire;
+};
+
+template<bool T>
+inline size_t
+RepoCommandParameter::wireEncode(EncodingImpl<T>& encoder) const
+{
+ size_t totalLength = 0;
+ size_t variableLength = 0;
+
+ if (m_hasProcessId) {
+ variableLength = encoder.prependNonNegativeInteger(m_processId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::ProcessId);
+ }
+
+ if (m_hasEndBlockId) {
+ variableLength = encoder.prependNonNegativeInteger(m_endBlockId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::EndBlockId);
+ }
+
+ if (m_hasStartBlockId) {
+ variableLength = encoder.prependNonNegativeInteger(m_startBlockId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::StartBlockId);
+ }
+
+ if (!getSelectors().empty()) {
+ totalLength += getSelectors().wireEncode(encoder);
+ }
+
+ if (m_hasName) {
+ totalLength += getName().wireEncode(encoder);
+ }
+
+ totalLength += encoder.prependVarNumber(totalLength);
+ totalLength += encoder.prependVarNumber(tlv::RepoCommandParameter);
+ return totalLength;
+}
+
+inline const Block&
+RepoCommandParameter::wireEncode() const
+{
+ if (m_wire.hasWire())
+ return m_wire;
+
+ EncodingEstimator estimator;
+ size_t estimatedSize = wireEncode(estimator);
+
+ EncodingBuffer buffer(estimatedSize, 0);
+ wireEncode(buffer);
+
+ m_wire = buffer.block();
+ return m_wire;
+}
+
+inline void
+RepoCommandParameter::wireDecode(const Block& wire)
+{
+ m_hasName = false;
+ m_hasStartBlockId = false;
+ m_hasEndBlockId = false;
+ m_hasProcessId = false;
+
+ m_wire = wire;
+
+ m_wire.parse();
+
+ if (m_wire.type() != tlv::RepoCommandParameter)
+ throw Error("Requested decoding of RepoCommandParameter, but Block is of different type");
+
+ // Name
+ Block::element_const_iterator val = m_wire.find(tlv::Name);
+ if (val != m_wire.elements_end())
+ {
+ m_hasName = true;
+ m_name.wireDecode(m_wire.get(tlv::Name));
+ }
+
+ // Selectors
+ val = m_wire.find(tlv::Selectors);
+ if (val != m_wire.elements_end())
+ {
+ m_selectors.wireDecode(*val);
+ }
+ else
+ m_selectors = Selectors();
+
+ // StartBlockId
+ val = m_wire.find(tlv::StartBlockId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasStartBlockId = true;
+ m_startBlockId = readNonNegativeInteger(*val);
+ }
+
+ // EndBlockId
+ val = m_wire.find(tlv::EndBlockId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasEndBlockId = true;
+ m_endBlockId = readNonNegativeInteger(*val);
+ }
+
+ // ProcessId
+ val = m_wire.find(tlv::ProcessId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasProcessId = true;
+ m_processId = readNonNegativeInteger(*val);
+ }
+
+}
+
+inline std::ostream&
+operator<<(std::ostream& os, const RepoCommandParameter& repoCommandParameter)
+{
+ os << "RepoCommandParameter(";
+
+ // Name
+ if (repoCommandParameter.hasName()) {
+ os << " Name: " << repoCommandParameter.getName();
+ }
+ if (repoCommandParameter.hasStartBlockId()) {
+ // StartBlockId
+ os << " StartBlockId: " << repoCommandParameter.getStartBlockId();
+ }
+ // EndBlockId
+ if (repoCommandParameter.hasEndBlockId()) {
+ os << " EndBlockId: " << repoCommandParameter.getEndBlockId();
+ }
+ //ProcessId
+ if (repoCommandParameter.hasProcessId()) {
+ os << " ProcessId: " << repoCommandParameter.getProcessId();
+ }
+ os << " )";
+ return os;
+}
+
+} // namespace repo
+
+#endif // REPO_REPO_COMMAND_PARAMETER_HPP
diff --git a/src/repo-command-response.hpp b/src/repo-command-response.hpp
new file mode 100644
index 0000000..0eb4f1d
--- /dev/null
+++ b/src/repo-command-response.hpp
@@ -0,0 +1,397 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_REPO_COMMAND_RESPONSE_HPP
+#define REPO_REPO_COMMAND_RESPONSE_HPP
+
+#include <ndn-cxx/encoding/block.hpp>
+#include <ndn-cxx/encoding/encoding-buffer.hpp>
+#include <ndn-cxx/encoding/tlv-nfd.hpp>
+#include "repo-tlv.hpp"
+
+namespace repo {
+
+using ndn::Block;
+using ndn::EncodingImpl;
+using ndn::EncodingEstimator;
+using ndn::EncodingBuffer;
+
+/**
+* @brief Class defining abstraction of Response for NDN Repo Protocol
+* @sa link http://redmine.named-data.net/projects/repo-ng/wiki/Repo_Protocol_Specification#Repo-Command-Response
+*/
+class RepoCommandResponse
+{
+public:
+ class Error : public ndn::Tlv::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : ndn::Tlv::Error(what)
+ {
+ }
+ };
+
+ RepoCommandResponse()
+ : m_hasStartBlockId(false)
+ , m_hasEndBlockId(false)
+ , m_hasProcessId(false)
+ , m_hasInsertNum(false)
+ , m_hasDeleteNum(false)
+ , m_hasStatusCode(false)
+ {
+ }
+
+ explicit
+ RepoCommandResponse(const Block& block)
+ {
+ wireDecode(block);
+ }
+
+ uint64_t
+ getStartBlockId() const
+ {
+ return m_startBlockId;
+ }
+
+ RepoCommandResponse&
+ setStartBlockId(uint64_t startBlockId)
+ {
+ m_startBlockId = startBlockId;
+ m_hasStartBlockId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasStartBlockId() const
+ {
+ return m_hasStartBlockId;
+ }
+
+ uint64_t
+ getEndBlockId() const
+ {
+ assert(hasEndBlockId());
+ return m_endBlockId;
+ }
+
+ RepoCommandResponse&
+ setEndBlockId(uint64_t endBlockId)
+ {
+ m_endBlockId = endBlockId;
+ m_hasEndBlockId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasEndBlockId() const
+ {
+ return m_hasEndBlockId;
+ }
+
+
+ uint64_t
+ getProcessId() const
+ {
+ return m_processId;
+ }
+
+ RepoCommandResponse&
+ setProcessId(uint64_t processId)
+ {
+ m_processId = processId;
+ m_hasProcessId = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasProcessId() const
+ {
+ return m_hasProcessId;
+ }
+
+ uint64_t
+ getStatusCode() const
+ {
+ return m_statusCode;
+ }
+
+ RepoCommandResponse&
+ setStatusCode(uint64_t statusCode)
+ {
+ m_statusCode = statusCode;
+ m_hasStatusCode = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasStatusCode() const
+ {
+ return m_hasStatusCode;
+ }
+
+ uint64_t
+ getInsertNum() const
+ {
+ return m_insertNum;
+ }
+
+ RepoCommandResponse&
+ setInsertNum(uint64_t insertNum)
+ {
+ m_insertNum = insertNum;
+ m_hasInsertNum = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasInsertNum() const
+ {
+ return m_hasInsertNum;
+ }
+
+ uint64_t
+ getDeleteNum() const
+ {
+ return m_deleteNum;
+ }
+
+ RepoCommandResponse&
+ setDeleteNum(uint64_t deleteNum)
+ {
+ m_deleteNum = deleteNum;
+ m_hasDeleteNum = true;
+ m_wire.reset();
+ return *this;
+ }
+
+ bool
+ hasDeleteNum() const
+ {
+ return m_hasDeleteNum;
+ }
+
+ template<bool T>
+ size_t
+ wireEncode(EncodingImpl<T>& block) const;
+
+ const Block&
+ wireEncode() const;
+
+ void
+ wireDecode(const Block& wire);
+
+private:
+ uint64_t m_statusCode;
+ uint64_t m_startBlockId;
+ uint64_t m_endBlockId;
+ uint64_t m_processId;
+ uint64_t m_insertNum;
+ uint64_t m_deleteNum;
+
+ bool m_hasStartBlockId;
+ bool m_hasEndBlockId;
+ bool m_hasProcessId;
+ bool m_hasInsertNum;
+ bool m_hasDeleteNum;
+ bool m_hasStatusCode;
+
+ mutable Block m_wire;
+};
+
+template<bool T>
+inline size_t
+RepoCommandResponse::wireEncode(EncodingImpl<T>& encoder) const
+{
+ size_t totalLength = 0;
+ size_t variableLength = 0;
+
+ if (m_hasDeleteNum) {
+ variableLength = encoder.prependNonNegativeInteger(m_deleteNum);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::DeleteNum);
+ }
+
+ if (m_hasInsertNum) {
+ variableLength = encoder.prependNonNegativeInteger(m_insertNum);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::InsertNum);
+ }
+
+ if (m_hasEndBlockId) {
+ variableLength = encoder.prependNonNegativeInteger(m_endBlockId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::EndBlockId);
+ }
+
+ if (m_hasStartBlockId) {
+ variableLength = encoder.prependNonNegativeInteger(m_startBlockId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(repo::tlv::StartBlockId);
+ }
+
+ if (m_hasStatusCode) {
+ variableLength = encoder.prependNonNegativeInteger(m_statusCode);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::StatusCode);
+ } else {
+ throw Error("required field StatusCode is missing");
+ }
+
+ if (m_hasProcessId) {
+ variableLength = encoder.prependNonNegativeInteger(m_processId);
+ totalLength += variableLength;
+ totalLength += encoder.prependVarNumber(variableLength);
+ totalLength += encoder.prependVarNumber(tlv::ProcessId);
+ }
+
+ totalLength += encoder.prependVarNumber(totalLength);
+ totalLength += encoder.prependVarNumber(tlv::RepoCommandResponse);
+ return totalLength;
+}
+
+inline const Block&
+RepoCommandResponse::wireEncode() const
+{
+ if (m_wire.hasWire())
+ return m_wire;
+
+ EncodingEstimator estimator;
+ size_t estimatedSize = wireEncode(estimator);
+
+ EncodingBuffer buffer(estimatedSize, 0);
+ wireEncode(buffer);
+
+ m_wire = buffer.block();
+ return m_wire;
+}
+
+inline void
+RepoCommandResponse::wireDecode(const Block& wire)
+{
+ m_hasStartBlockId = false;
+ m_hasEndBlockId = false;
+ m_hasProcessId = false;
+ m_hasStatusCode = false;
+ m_hasInsertNum = false;
+ m_hasDeleteNum = false;
+
+ m_wire = wire;
+
+ m_wire.parse();
+
+ Block::element_const_iterator val;
+
+ if (m_wire.type() != tlv::RepoCommandResponse)
+ throw Error("RepoCommandResponse malformed");
+
+ // StartBlockId
+ val = m_wire.find(tlv::StartBlockId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasStartBlockId = true;
+ m_startBlockId = readNonNegativeInteger(*val);
+ }
+
+ // EndBlockId
+ val = m_wire.find(tlv::EndBlockId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasEndBlockId = true;
+ m_endBlockId = readNonNegativeInteger(*val);
+ }
+
+ // ProcessId
+ val = m_wire.find(tlv::ProcessId);
+ if (val != m_wire.elements_end())
+ {
+ m_hasProcessId = true;
+ m_processId = readNonNegativeInteger(*val);
+ }
+
+ // StatusCode
+ val = m_wire.find(tlv::StatusCode);
+ if (val != m_wire.elements_end())
+ {
+ m_hasStatusCode = true;
+ m_statusCode = readNonNegativeInteger(*val);
+
+ } else {
+ throw Error("required field StatusCode is missing");
+ }
+
+ // InsertNum
+ val = m_wire.find(tlv::InsertNum);
+ if (val != m_wire.elements_end())
+ {
+ m_hasInsertNum = true;
+ m_insertNum = readNonNegativeInteger(*val);
+ }
+
+ // DeleteNum
+ val = m_wire.find(tlv::DeleteNum);
+ if (val != m_wire.elements_end())
+ {
+ m_hasDeleteNum = true;
+ m_deleteNum = readNonNegativeInteger(*val);
+ }
+}
+
+inline std::ostream&
+operator<<(std::ostream& os, const RepoCommandResponse& repoCommandResponse)
+{
+ os << "RepoCommandResponse(";
+
+ if (repoCommandResponse.hasProcessId()) {
+ os << " ProcessId: " << repoCommandResponse.getProcessId();
+ }
+ if (repoCommandResponse.hasStatusCode()) {
+ os << " StatusCode: " << repoCommandResponse.getStatusCode();
+ }
+ if (repoCommandResponse.hasStartBlockId()) {
+ os << " StartBlockId: " << repoCommandResponse.getStartBlockId();
+ }
+ if (repoCommandResponse.hasEndBlockId()) {
+ os << " EndBlockId: " << repoCommandResponse.getEndBlockId();
+ }
+ if (repoCommandResponse.hasInsertNum()) {
+ os << " InsertNum: " << repoCommandResponse.getInsertNum();
+ }
+ if (repoCommandResponse.hasDeleteNum()) {
+ os << " DeleteNum: " << repoCommandResponse.getDeleteNum();
+
+ }
+ os << " )";
+ return os;
+}
+
+} // namespace repo
+
+#endif // REPO_REPO_COMMAND_RESPONSE_HPP
diff --git a/src/repo-tlv.hpp b/src/repo-tlv.hpp
new file mode 100644
index 0000000..7d917e1
--- /dev/null
+++ b/src/repo-tlv.hpp
@@ -0,0 +1,44 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_REPO_TLV_HPP
+#define REPO_REPO_TLV_HPP
+
+#include <ndn-cxx/encoding/tlv.hpp>
+
+namespace repo {
+namespace tlv {
+
+using namespace ndn::Tlv;
+
+enum {
+ RepoCommandParameter = 201,
+ StartBlockId = 204,
+ EndBlockId = 205,
+ ProcessId = 206,
+ RepoCommandResponse = 207,
+ StatusCode = 208,
+ InsertNum = 209,
+ DeleteNum = 210
+};
+
+} // tlv
+} // repo
+
+#endif // REPO_REPO_TLV_HPP
diff --git a/src/repo.cpp b/src/repo.cpp
new file mode 100644
index 0000000..3502763
--- /dev/null
+++ b/src/repo.cpp
@@ -0,0 +1,170 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "repo.hpp"
+
+namespace repo {
+
+RepoConfig
+parseConfig(const std::string& configPath)
+{
+ if (configPath.empty()) {
+ std::cerr << "configuration file path is empty" << std::endl;
+ }
+
+ std::ifstream fin(configPath.c_str());
+ if (!fin.is_open())
+ throw Repo::Error("failed to open configuration file '"+ configPath +"'");
+
+ using namespace boost::property_tree;
+ ptree propertyTree;
+ try {
+ read_info(fin, propertyTree);
+ }
+ catch (ptree_error& e) {
+ throw Repo::Error("failed to read configuration file '"+ configPath +"'");
+ }
+
+ ptree repoConf = propertyTree.get_child("repo");
+
+ RepoConfig repoConfig;
+
+ ptree dataConf = repoConf.get_child("data");
+
+ for (ptree::const_iterator it = dataConf.begin();
+ it != dataConf.end();
+ ++it)
+ {
+ if (it->first == "prefix")
+ repoConfig.dataPrefixes.push_back(Name(it->second.get_value<std::string>()));
+ else
+ throw Repo::Error("Unrecognized '" + it->first + "' option in 'data' section in "
+ "configuration file '"+ configPath +"'");
+ }
+
+ ptree commandConf = repoConf.get_child("command");
+ for (ptree::const_iterator it = commandConf.begin();
+ it != commandConf.end();
+ ++it)
+ {
+ if (it->first == "prefix")
+ repoConfig.repoPrefixes.push_back(Name(it->second.get_value<std::string>()));
+ else
+ throw Repo::Error("Unrecognized '" + it->first + "' option in 'command' section in "
+ "configuration file '"+ configPath +"'");
+ }
+
+ ptree tcpBulkInsert = repoConf.get_child("tcp_bulk_insert");
+ bool isTcpBulkEnabled = false;
+ std::string host = "localhost";
+ std::string port = "7376";
+ for (ptree::const_iterator it = tcpBulkInsert.begin();
+ it != tcpBulkInsert.end();
+ ++it)
+ {
+ isTcpBulkEnabled = true;
+
+ // tcp_bulk_insert {
+ // host "localhost" ; IP address or hostname to listen on
+ // port 7635 ; Port number to listen on
+ // }
+ if (it->first == "host") {
+ host = it->second.get_value<std::string>();
+ }
+ else if (it->first == "port") {
+ port = it->second.get_value<std::string>();
+ }
+ else
+ throw Repo::Error("Unrecognized '" + it->first + "' option in 'tcp_bulk_insert' section in "
+ "configuration file '"+ configPath +"'");
+ }
+ if (isTcpBulkEnabled) {
+ repoConfig.tcpBulkInsertEndpoints.push_back(std::make_pair(host, port));
+ }
+
+ if (repoConf.get<std::string>("storage.method") != "sqlite")
+ throw Repo::Error("Only 'sqlite' storage method is supported");
+
+ repoConfig.dbPath = repoConf.get<std::string>("storage.path");
+
+ repoConfig.validatorNode = repoConf.get_child("validator");
+ return repoConfig;
+}
+
+inline static void
+NullDeleter(boost::asio::io_service* variable)
+{
+ // do nothing
+}
+
+Repo::Repo(boost::asio::io_service& ioService, const RepoConfig& config)
+ : m_config(config)
+ , m_scheduler(ioService)
+ , m_face(shared_ptr<boost::asio::io_service>(&ioService, &NullDeleter))
+ , m_storageHandle(openStorage(config))
+ , m_readHandle(m_face, *m_storageHandle, m_keyChain, m_scheduler)
+ , m_writeHandle(m_face, *m_storageHandle, m_keyChain, m_scheduler, m_validator)
+ , m_deleteHandle(m_face, *m_storageHandle, m_keyChain, m_scheduler, m_validator)
+ , m_tcpBulkInsertHandle(ioService, *m_storageHandle)
+
+{
+ //Trust model not implemented, this is just an empty validator
+ //@todo add a function to parse RepoConfig.validatorNode and define the trust model
+ m_validator.addInterestRule("^<>",
+ *m_keyChain.
+ getCertificate(m_keyChain.getDefaultCertificateName()));
+}
+
+shared_ptr<StorageHandle>
+Repo::openStorage(const RepoConfig& config)
+{
+ shared_ptr<StorageHandle> storageHandle = ndn::make_shared<SqliteHandle>(config.dbPath);
+ return storageHandle;
+}
+
+void
+Repo::enableListening()
+{
+ // Enable "listening" on Data prefixes
+ for (vector<ndn::Name>::iterator it = m_config.dataPrefixes.begin();
+ it != m_config.dataPrefixes.end();
+ ++it)
+ {
+ m_readHandle.listen(*it);
+ }
+
+ // Enable "listening" on control prefixes
+ for (vector<ndn::Name>::iterator it = m_config.repoPrefixes.begin();
+ it != m_config.repoPrefixes.end();
+ ++it)
+ {
+ m_writeHandle.listen(*it);
+ m_deleteHandle.listen(*it);
+ }
+
+ // Enable listening on TCP bulk insert addresses
+ for (vector<pair<string, string> >::iterator it = m_config.tcpBulkInsertEndpoints.begin();
+ it != m_config.tcpBulkInsertEndpoints.end();
+ ++it)
+ {
+ m_tcpBulkInsertHandle.listen(it->first, it->second);
+ }
+}
+
+} // namespace repo
diff --git a/src/repo.hpp b/src/repo.hpp
new file mode 100644
index 0000000..a92eb7d
--- /dev/null
+++ b/src/repo.hpp
@@ -0,0 +1,97 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_REPO_HPP
+#define REPO_REPO_HPP
+
+#include "storage/storage-handle.hpp"
+#include "storage/sqlite-handle.hpp"
+
+#include "handles/read-handle.hpp"
+#include "handles/write-handle.hpp"
+#include "handles/delete-handle.hpp"
+#include "handles/tcp-bulk-insert-handle.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/command-interest-validator.hpp>
+
+#include <boost/property_tree/ptree.hpp>
+#include <boost/property_tree/info_parser.hpp>
+
+namespace repo {
+
+using std::string;
+using std::vector;
+using std::pair;
+
+struct RepoConfig
+{
+ //StorageMethod storageMethod; This will be implemtented if there is other method.
+ std::string dbPath;
+ vector<ndn::Name> dataPrefixes;
+ vector<ndn::Name> repoPrefixes;
+ vector<pair<string, string> > tcpBulkInsertEndpoints;
+
+ //@todo validator should be configured in config file
+ boost::property_tree::ptree validatorNode;
+};
+
+RepoConfig
+parseConfig(const std::string& confPath);
+
+class Repo : noncopyable
+{
+
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ Repo(boost::asio::io_service& ioService, const RepoConfig& config);
+
+ void
+ enableListening();
+
+private:
+ static shared_ptr<StorageHandle>
+ openStorage(const RepoConfig& config);
+
+private:
+ RepoConfig m_config;
+ ndn::Scheduler m_scheduler;
+ ndn::Face m_face;
+ shared_ptr<StorageHandle> m_storageHandle;
+ KeyChain m_keyChain;
+ CommandInterestValidator m_validator;
+ ReadHandle m_readHandle;
+ WriteHandle m_writeHandle;
+ DeleteHandle m_deleteHandle;
+ TcpBulkInsertHandle m_tcpBulkInsertHandle;
+};
+
+} // namespace repo
+
+#endif // REPO_REPO_HPP
diff --git a/src/storage/sqlite-handle.cpp b/src/storage/sqlite-handle.cpp
new file mode 100644
index 0000000..02598a9
--- /dev/null
+++ b/src/storage/sqlite-handle.cpp
@@ -0,0 +1,841 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.hpp"
+#include "sqlite-handle.hpp"
+#include <boost/filesystem.hpp>
+
+namespace repo {
+
+SqliteHandle::SqliteHandle(const string& dbPath)
+ : StorageHandle(STORAGE_METHOD_SQLITE)
+{
+ if (dbPath.empty()) {
+ std::cerr << "Create db file in local location [" << dbPath << "]. " << std::endl
+ << "You can assign the path using -d option" << std::endl;
+ m_dbPath = string("ndn_repo.db");
+ }
+ else {
+ boost::filesystem::path fsPath(dbPath);
+ boost::filesystem::file_status fsPathStatus = boost::filesystem::status(fsPath);
+ if (!boost::filesystem::is_directory(fsPathStatus)) {
+ if (!boost::filesystem::create_directory(boost::filesystem::path(fsPath))) {
+ throw Error("Folder '" + dbPath + "' does not exists and cannot be created");
+ }
+ }
+
+ m_dbPath = dbPath + "/ndn_repo.db";
+ }
+ initializeRepo();
+}
+
+void
+SqliteHandle::initializeRepo()
+{
+ char* errMsg = 0;
+
+ int rc = sqlite3_open_v2(m_dbPath.c_str(), &m_db,
+ SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
+#ifdef DISABLE_SQLITE3_FS_LOCKING
+ "unix-dotfile"
+#else
+ 0
+#endif
+ );
+
+ if (rc == SQLITE_OK) {
+ sqlite3_exec(m_db, "CREATE TABLE NDN_REPO ("
+ "name BLOB PRIMARY KEY, "
+ "data BLOB, "
+ "parentName BLOB, "
+ "nChildren INTEGER);\n"
+ "CREATE INDEX NdnRepoParentName ON NDN_REPO (parentName);\n"
+ "CREATE INDEX NdnRepoData ON NDN_REPO (data);\n"
+ , 0, 0, &errMsg);
+ // Ignore errors (when database already exists, errors are expected)
+ }
+ else {
+ std::cerr << "Database file open failure rc:" << rc << std::endl;
+ throw Error("Database file open failure");
+ }
+
+ Name rootName;
+ string sql = string("SELECT * FROM NDN_REPO WHERE name = ?;");
+
+ sqlite3_stmt* queryStmt = 0;
+
+ rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(queryStmt, 1, rootName.wireEncode().wire(),
+ rootName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ std::cerr << "root has been created" << std::endl;
+ }
+ else if (rc == SQLITE_DONE) {
+ sqlite3_stmt* p2Stmt = 0;
+ sql = string("INSERT INTO NDN_REPO (name, data, parentName, nChildren) "
+ " VALUES (?, ?, ?, ?);");
+ rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &p2Stmt, 0);
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(p2Stmt, 1, rootName.wireEncode().wire(),
+ rootName.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_null(p2Stmt, 2) == SQLITE_OK &&
+ sqlite3_bind_null(p2Stmt, 3) == SQLITE_OK &&
+ sqlite3_bind_int(p2Stmt, 4, 0) == SQLITE_OK) {
+ rc = sqlite3_step(p2Stmt);;
+ if (rc != SQLITE_ROW && rc != SQLITE_DONE) {
+ std::cerr << "Root name insert failure rc:" << rc << std::endl;
+ sqlite3_finalize(p2Stmt);
+ throw Error("Root name insert failure");
+ }
+ }
+ else {
+ std::cerr << "bind blob failure rc:" << rc << std::endl;
+ sqlite3_finalize(p2Stmt);
+ throw Error("bind blob failure");
+ }
+ }
+ else {
+ std::cerr << "p2Stmt prepared rc:" << rc << std::endl;
+ sqlite3_finalize(p2Stmt);
+ throw Error("p2Stmt prepared");
+ }
+ sqlite3_finalize(p2Stmt);
+ }
+ else {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("Database query failure");
+ }
+ }
+ sqlite3_finalize(queryStmt);
+ }
+ sqlite3_exec(m_db, "PRAGMA synchronous = OFF", 0, 0, &errMsg);
+ sqlite3_exec(m_db, "PRAGMA journal_mode = WAL", 0, 0, &errMsg);
+}
+
+SqliteHandle::~SqliteHandle()
+{
+ sqlite3_close(m_db);
+}
+
+
+//Temporarily assigned the datatype of every component. needs to be further discussed
+
+bool
+SqliteHandle::insertData(const Data& data)
+{
+ Name name = data.getName();
+
+ if (name.empty()) {
+ std::cerr << "name is empty" << std::endl;
+ return false;
+ }
+
+ int rc = 0;
+
+ string updateSql2 = string("UPDATE NDN_REPO SET data = ? WHERE name = ?;");
+ //std::cerr << "update" << std::endl;
+ sqlite3_stmt* update2Stmt = 0;
+ if (sqlite3_prepare_v2(m_db, updateSql2.c_str(), -1, &update2Stmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(update2Stmt);
+ std::cerr << "update sql2 not prepared" << std::endl;
+ throw Error("update sql2 not prepared");
+ }
+ if (sqlite3_bind_blob(update2Stmt, 1,
+ data.wireEncode().wire(),
+ data.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_blob(update2Stmt, 2,
+ name.wireEncode().wire(),
+ name.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(update2Stmt);
+ sqlite3_finalize(update2Stmt);
+ if (rc != SQLITE_DONE) {
+ return false;
+ }
+ //what error??
+ //std::cerr << "update rc:" << rc << std::endl;
+ /// \todo Do something with rc
+ }
+ int changeCount = sqlite3_changes(m_db);
+ //std::cerr << "changeCount: " << changeCount << std::endl;
+ if (changeCount > 0) {
+ return true;
+ }
+
+ sqlite3_stmt* insertStmt = 0;
+ sqlite3_stmt* updateStmt = 0;
+ string insertSql = string("INSERT INTO NDN_REPO (name, data, parentName, nChildren) "
+ "VALUES (?, ?, ?, ?)");
+ string updateSql = string("UPDATE NDN_REPO SET nChildren = nChildren + 1 WHERE name = ?");
+
+ Name rootName;
+
+
+ if (sqlite3_prepare_v2(m_db, insertSql.c_str(), -1, &insertStmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(insertStmt);
+ std::cerr << "insert sql not prepared" << std::endl;
+ }
+ if (sqlite3_prepare_v2(m_db, updateSql.c_str(), -1, &updateStmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(updateStmt);
+ std::cerr << "update sql not prepared" << std::endl;
+ throw Error("update sql not prepared");
+ }
+
+ //Insert and read the prefix
+ Name parentName = name;
+ Name grandName;
+ do {
+ parentName = parentName.getPrefix(-1);
+ if (!hasName(parentName)) {
+ grandName = parentName.getPrefix(-1);
+ if (sqlite3_bind_blob(insertStmt, 1,
+ parentName.wireEncode().wire(),
+ parentName.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_null(insertStmt, 2) == SQLITE_OK &&
+ sqlite3_bind_blob(insertStmt, 3,
+ grandName.wireEncode().wire(),
+ grandName.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_int(insertStmt, 4, 1) == SQLITE_OK) {
+ rc = sqlite3_step(insertStmt);
+ if (rc == SQLITE_CONSTRAINT) {
+ std::cerr << "Insert parent prefix failed" << std::endl;
+ sqlite3_finalize(insertStmt);
+ throw Error("Insert parent prefix failed");
+ }
+ sqlite3_reset(insertStmt);
+ }
+ }
+ else {
+ break;
+ }
+ } while (!parentName.empty());
+
+ //The existed parent nChildren + 1
+
+ if (sqlite3_bind_blob(updateStmt, 1, parentName.wireEncode().wire(),
+ parentName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(updateStmt);
+ if (rc != SQLITE_ROW && rc != SQLITE_DONE) {
+ std::cerr << "update error rc:" << rc << std::endl;
+ sqlite3_finalize(updateStmt);
+ sqlite3_finalize(insertStmt);
+ throw Error("update error");
+ }
+ sqlite3_reset(updateStmt);
+ }
+
+ //Insert the name and the data, if this data name exists update, else insert data
+
+ parentName = name.getPrefix(-1);
+ sqlite3_reset(insertStmt);
+ if (sqlite3_bind_blob(insertStmt, 1,
+ name.wireEncode().wire(),
+ name.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_blob(insertStmt, 2,
+ data.wireEncode().wire(),
+ data.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_blob(insertStmt, 3,
+ parentName.wireEncode().wire(),
+ parentName.wireEncode().size(), 0) == SQLITE_OK &&
+ sqlite3_bind_int(insertStmt, 4, 0) == SQLITE_OK) {
+ rc = sqlite3_step(insertStmt);
+ //std::cerr << "insert rc:" << rc << std::endl;
+ //std::cerr << "insert the data: " << data.wireEncode().wire() << std::endl;
+ if (rc == SQLITE_CONSTRAINT) {
+ std::cerr << "The name of the data has existed!" << std::endl;
+ sqlite3_finalize(insertStmt);
+ return false;
+ }
+ }
+
+ sqlite3_finalize(updateStmt);
+ sqlite3_finalize(insertStmt);
+ return true;
+}
+
+bool
+SqliteHandle::deleteData(const Name& name)
+{
+ sqlite3_stmt* queryStmt = 0;
+ sqlite3_stmt* deleteStmt = 0;
+ sqlite3_stmt* updateStmt = 0;
+ sqlite3_stmt* update2Stmt = 0;
+
+ string querySql = string("SELECT * from NDN_REPO where name = ?;");
+ string deleteSql = string("DELETE from NDN_REPO where name = ?;");
+
+ string updateSql = string("UPDATE NDN_REPO SET nChildren = nChildren - 1 WHERE name = ?;");
+ string updateSql2 = string("UPDATE NDN_REPO SET data = NULL WHERE name = ?;");
+
+ int rc = sqlite3_prepare_v2(m_db, querySql.c_str(), -1, &queryStmt, 0);
+ Name tmpName = name;
+ int nChildren = -1;
+ if (sqlite3_prepare_v2(m_db, deleteSql.c_str(), -1, &deleteStmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(deleteStmt);
+ std::cerr << "delete statement prepared failed" << std::endl;
+ throw Error("delete statement prepared failed");
+ }
+ if (sqlite3_prepare_v2(m_db, updateSql.c_str(), -1, &updateStmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(updateStmt);
+ std::cerr << "delete update prepared failed" << std::endl;
+ throw Error("delete update prepared failed");
+ }
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(queryStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ nChildren = sqlite3_column_int(queryStmt, 3);
+ }
+ else {
+ std::cerr << "Database query no such name or failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ }
+ if (nChildren > 0) {
+ //update internal node, so just update and return
+ if (sqlite3_prepare_v2(m_db, updateSql2.c_str(), -1, &update2Stmt, 0) != SQLITE_OK) {
+ sqlite3_finalize(update2Stmt);
+ std::cerr << "delete update prepared failed" << std::endl;
+ throw Error("delete update prepared failed");
+ }
+ if (sqlite3_bind_blob(update2Stmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(update2Stmt);
+ std::cerr << "deleteData update" << std::endl;
+ }
+ else {
+ std::cerr << "delete bind error" << std::endl;
+ sqlite3_finalize(update2Stmt);
+ throw Error("delete bind error");
+ }
+ return true;
+ }
+ else {
+ //Delete the leaf node
+ if (sqlite3_bind_blob(deleteStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(deleteStmt);
+ if (rc != SQLITE_DONE && rc !=SQLITE_ROW) {
+ std::cerr << "leaf node delete error rc:" << rc << std::endl;
+ sqlite3_finalize(deleteStmt);
+ throw Error("leaf node delete error");
+ }
+ }
+ else {
+ std::cerr << "delete bind error" << std::endl;
+ sqlite3_finalize(deleteStmt);
+ throw Error("delete bind error");
+ }
+ sqlite3_reset(deleteStmt);
+ }
+ queryStmt = 0;
+ rc = sqlite3_prepare_v2(m_db, querySql.c_str(), -1, &queryStmt, 0);
+ if (rc != SQLITE_OK) {
+ std::cerr << "prepare error" << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("prepare error");
+ }
+ //read prefix if nChildren is 0 and data is 0
+ int dataSize = 0;
+ do {
+ tmpName = tmpName.getPrefix(-1);
+ if (sqlite3_bind_blob(queryStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ nChildren = sqlite3_column_int(queryStmt, 3);
+ dataSize = sqlite3_column_bytes(queryStmt, 1);
+ }
+ else {
+ std::cerr << "Database query no such name or failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ if (nChildren == 1 && !tmpName.empty() && dataSize == 0) {
+ //Delete this internal node
+ if (sqlite3_bind_blob(deleteStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(deleteStmt);
+ if (rc != SQLITE_DONE && rc !=SQLITE_ROW) {
+ std::cerr << "internal node delete error rc:" << rc << std::endl;
+ sqlite3_finalize(deleteStmt);
+ throw Error("internal node delete error");
+ }
+ }
+ else {
+ std::cerr << "delete bind error" << std::endl;
+ sqlite3_finalize(deleteStmt);
+ throw Error("delete bind error");
+ }
+ sqlite3_reset(deleteStmt);
+ }
+ else {
+ //nChildren - 1
+ if (sqlite3_bind_blob(updateStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(updateStmt);
+ if (rc != SQLITE_DONE && rc !=SQLITE_ROW) {
+ std::cerr << "internal node nChildren update error rc:" << rc << std::endl;
+ sqlite3_finalize(updateStmt);
+ throw Error("internal node nChildren update error");
+ }
+ }
+ else {
+ std::cerr << "update bind error" << std::endl;
+ sqlite3_finalize(updateStmt);
+ throw Error("update bind error");
+ }
+ sqlite3_reset(updateStmt);
+ break;
+ }
+ }
+ else {
+ std::cerr << "query bind error" << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("query bind error");
+ }
+ sqlite3_reset(queryStmt);
+ } while (!tmpName.empty());
+
+ }
+ else {
+ std::cerr << "query prepared failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("query prepared failure");
+ }
+ return true;
+}
+
+bool
+SqliteHandle::readData(const Interest& interest, Data& data)
+{
+ vector<Name> names;
+ Name resultName;
+ if (!interest.hasSelectors()) {
+ return readDataPlain(interest.getName(), data);
+ }
+ else {
+ if (readNameSelector(interest, names)) {
+ if (!filterNameChild(interest.getName(), interest.getChildSelector(), names, resultName)) {
+ return false;
+ }
+ }
+ return readData(resultName, data);
+ }
+}
+
+// This function is the first version of data read following longest prefix match.
+// It will return the leftmost data
+bool
+SqliteHandle::readDataPlain(const Name& name, Data& data)
+{
+ vector<Name> names;
+ Name resultName;
+ readDataName(name, names);
+ filterNameChild(name, 0, names, resultName);
+ if (!resultName.empty()) {
+ return readData(resultName, data);
+ }
+ else
+ {
+ return false;
+ }
+}
+
+// retrieve all the leaf nodes of a subtree
+bool
+SqliteHandle::readDataName(const Name& name, vector<Name>& names) const
+{
+ if (name.empty()) {
+ std::cerr << "The name is empty" << std::endl;
+ return false;
+ }
+ Name tmpName = name;
+ //This queue is for internal node;
+ queue<Name> internalNames;
+
+ // Step 1. Check if the requested name corresponds to a leaf (data is not NULL)
+ string sql = string("SELECT * FROM NDN_REPO WHERE name = ? AND data IS NOT NULL;");
+ sqlite3_stmt* queryStmt = 0;
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc != SQLITE_OK)
+ throw Error("prepare error");
+
+ if (sqlite3_bind_blob(queryStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ int nChildren = sqlite3_column_int(queryStmt, 3);
+ Name elementName;
+ elementName.wireDecode(Block(sqlite3_column_blob(queryStmt, 0),
+ sqlite3_column_bytes(queryStmt, 0)));
+ names.push_back(elementName);
+ if (nChildren == 0) {
+ sqlite3_finalize(queryStmt);
+ return true;
+ }
+ }
+ else if (rc == SQLITE_DONE) {
+ // ignore
+ }
+ else {
+ std::cerr << "read error rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("read error");
+ }
+ }
+ sqlite3_finalize(queryStmt);
+
+
+ // Step 2. Recursively find all data packets with the specified prefix
+ string psql = string("SELECT * FROM NDN_REPO WHERE parentName = ?;");
+ sqlite3_stmt* queryParentStmt = 0;
+ rc = sqlite3_prepare_v2(m_db, psql.c_str(), -1, &queryParentStmt, 0);
+ if (rc != SQLITE_OK)
+ throw Error("prepare error");
+
+ internalNames.push(tmpName);
+ while (!internalNames.empty()) {
+ tmpName = internalNames.front();
+ internalNames.pop();
+ if (sqlite3_bind_blob(queryParentStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ while (true) {
+ rc = sqlite3_step(queryParentStmt);
+ if (rc == SQLITE_ROW) {
+ Name elementName;
+ elementName.wireDecode(Block(sqlite3_column_blob(queryParentStmt, 0),
+ sqlite3_column_bytes(queryParentStmt, 0)));
+ int nChildren = sqlite3_column_int(queryParentStmt, 3);
+ if (nChildren > 0) {
+ internalNames.push(elementName);
+ }
+ if (sqlite3_column_type(queryParentStmt, 1) != SQLITE_NULL) {
+ names.push_back(elementName);
+ }
+ }
+ else if (rc == SQLITE_DONE) {
+ break;
+ }
+ else {
+ std::cerr << "read error rc:" << rc << std::endl;
+ sqlite3_finalize(queryParentStmt);
+ throw Error("read error");
+ }
+ }
+ sqlite3_reset(queryParentStmt);
+ }
+ else {
+ std::cerr << "bind error" << std::endl;
+ sqlite3_finalize(queryParentStmt);
+ throw Error("bind error");
+ }
+ }
+ sqlite3_finalize(queryParentStmt);
+ return true;
+}
+
+bool
+SqliteHandle::readNameSelector(const Interest& interest, vector<Name>& names) const
+{
+ if (interest.getName().empty()) {
+ std::cerr << "The name of interest is empty" << std::endl;
+ return false;
+ }
+ Name tmpName = interest.getName();
+ //This queue is for internal node;
+ queue<Name> internalNames;
+
+ // Step 1. Check if the requested Data corresponds to a leaf (data is not NULL)
+ sqlite3_stmt* queryStmt = 0;
+ string sql = string("SELECT * FROM NDN_REPO WHERE name = ? AND data IS NOT NULL;");
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc != SQLITE_OK)
+ throw Error("prepare error");
+
+ if (sqlite3_bind_blob(queryStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ Data elementData;
+ elementData.wireDecode(Block(sqlite3_column_blob(queryStmt, 1),
+ sqlite3_column_bytes(queryStmt, 1)));
+ if (interest.matchesData(elementData)) {
+ names.push_back(elementData.getName());
+ }
+
+ int nChildren = sqlite3_column_int(queryStmt, 3);
+ if (nChildren == 0) {
+ sqlite3_finalize(queryStmt);
+ return true;
+ }
+ }
+ else if (rc == SQLITE_DONE) {
+ // ignore
+ }
+ else {
+ std::cerr << "read error rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("read error");
+ }
+ }
+ sqlite3_finalize(queryStmt);
+
+ // Step 2. Recursively find all data packets that match the Interest
+ internalNames.push(tmpName);
+ sqlite3_stmt* queryParentStmt = 0;
+ string psql = string("SELECT * FROM NDN_REPO WHERE parentName = ?;");
+ rc = sqlite3_prepare_v2(m_db, psql.c_str(), -1, &queryParentStmt, 0);
+ if (rc != SQLITE_OK)
+ throw Error("prepare error");
+
+ while (!internalNames.empty()) {
+ tmpName = internalNames.front();
+ internalNames.pop();
+ if (sqlite3_bind_blob(queryParentStmt, 1,
+ tmpName.wireEncode().wire(),
+ tmpName.wireEncode().size(), 0) == SQLITE_OK) {
+ while (true) {
+ rc = sqlite3_step(queryParentStmt);
+ if (rc == SQLITE_ROW) {
+ if (sqlite3_column_type(queryParentStmt, 1) != SQLITE_NULL) {
+ Data elementData;
+ elementData.wireDecode(Block(sqlite3_column_blob(queryParentStmt, 1),
+ sqlite3_column_bytes(queryParentStmt, 1)));
+ if (interest.matchesData(elementData)) {
+ names.push_back(elementData.getName());
+ }
+ }
+
+ Name elementName;
+ elementName.wireDecode(Block(sqlite3_column_blob(queryParentStmt, 0),
+ sqlite3_column_bytes(queryParentStmt, 0)));
+
+ int nChildren = sqlite3_column_int(queryParentStmt, 3);
+ if (nChildren > 0) {
+ internalNames.push(elementName);
+ }
+ }
+ else if (rc == SQLITE_DONE) {
+ break;
+ }
+ else {
+ std::cerr << "read error rc:" << rc << std::endl;
+ sqlite3_finalize(queryParentStmt);
+ throw Error("read error");
+ }
+ }
+ sqlite3_reset(queryParentStmt);
+ }
+ else {
+ std::cerr << "bind error" << std::endl;
+ sqlite3_finalize(queryParentStmt);
+ throw Error("bind error");
+ }
+ }
+ sqlite3_finalize(queryParentStmt);
+ return true;
+}
+
+bool
+SqliteHandle::filterNameChild(const Name& name, int childSelector,
+ const vector<Name>& names, Name& resultName)
+{
+ if (childSelector == 0) {
+ if (!names.empty()) {
+ resultName = *std::min_element(names.begin(), names.end());
+ }
+ else {
+ return false;
+ }
+ }
+ else if (childSelector == 1) {
+ if (!names.empty()) {
+ resultName = *std::max_element(names.begin(), names.end());
+ }
+ else {
+ return false;
+ }
+ }
+ else {
+ return false;
+ }
+ return true;
+}
+
+bool
+SqliteHandle::readNameAny(const Name& name, const Selectors& selectors, vector<Name>& names)
+{
+ if (selectors.empty()) {
+ if (hasName(name)) {
+ names.push_back(name);
+ }
+ return true;
+ }
+ else {
+ Interest interest(name);
+ interest.setSelectors(selectors);
+ readNameSelector(interest, names);
+ if (selectors.getChildSelector() >= 0) {
+ Name resultName;
+ if (!filterNameChild(name, selectors.getChildSelector(), names, resultName))
+ return false;
+ names.clear();
+ names.push_back(resultName);
+ return true;
+ }
+ else {
+ return true;
+ }
+ }
+}
+
+bool
+SqliteHandle::readData(const Name& name, Data& data)
+{
+ sqlite3_stmt* queryStmt = 0;
+ string sql = string("SELECT * FROM NDN_REPO WHERE name = ? AND data IS NOT NULL;");
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(queryStmt, 1,
+ name.wireEncode().wire(),
+ name.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ data.wireDecode(Block(sqlite3_column_blob(queryStmt, 1),
+ sqlite3_column_bytes(queryStmt, 1)));
+ sqlite3_finalize(queryStmt);
+ return true;
+ }
+ else if (rc == SQLITE_DONE) {
+ return false;
+ }
+ else {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("Database query failure");
+ }
+ }
+ sqlite3_finalize(queryStmt);
+ }
+ return true;
+}
+
+
+//This is the exact name query in database.
+bool
+SqliteHandle::hasName(const Name& name)
+{
+ sqlite3_stmt* queryStmt = 0;
+ string sql = string("select * from NDN_REPO where name = ?;");
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(queryStmt, 1,
+ name.wireEncode().wire(),
+ name.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ sqlite3_finalize(queryStmt);
+ return true;
+ }
+ else if (rc == SQLITE_DONE) {
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ else {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ }
+ sqlite3_finalize(queryStmt);
+ }
+ return true;
+}
+
+//This is the exact parent name query in database.
+bool
+SqliteHandle::hasParentName(const Name& parentName) const
+{
+ sqlite3_stmt* queryStmt = 0;
+ string sql = string("SELECT * FROM NDN_REPO WHERE parentName = ?;");
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc == SQLITE_OK) {
+ if (sqlite3_bind_blob(queryStmt, 1,
+ parentName.wireEncode().wire(),
+ parentName.wireEncode().size(), 0) == SQLITE_OK) {
+ rc = sqlite3_step(queryStmt);
+ if (rc == SQLITE_ROW) {
+ sqlite3_finalize(queryStmt);
+ return true;
+ }
+ else if (rc == SQLITE_DONE) {
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ else {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ return false;
+ }
+ }
+ sqlite3_finalize(queryStmt);
+ }
+ return true;
+}
+
+size_t
+SqliteHandle::size()
+{
+ sqlite3_stmt* queryStmt = 0;
+ string sql("SELECT count(*) FROM NDN_REPO WHERE data IS NOT NULL");
+ int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
+ if (rc != SQLITE_OK)
+ {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("Database query failure");
+ }
+
+ rc = sqlite3_step(queryStmt);
+ if (rc != SQLITE_ROW)
+ {
+ std::cerr << "Database query failure rc:" << rc << std::endl;
+ sqlite3_finalize(queryStmt);
+ throw Error("Database query failure");
+ }
+
+ size_t nDatas = static_cast<size_t>(sqlite3_column_int64(queryStmt, 0));
+ return nDatas;
+}
+
+} //namespace repo
diff --git a/src/storage/sqlite-handle.hpp b/src/storage/sqlite-handle.hpp
new file mode 100644
index 0000000..a0fbb1c
--- /dev/null
+++ b/src/storage/sqlite-handle.hpp
@@ -0,0 +1,144 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_STORAGE_SQLITE_HANDLE_HPP
+#define REPO_STORAGE_SQLITE_HANDLE_HPP
+
+#include "storage-handle.hpp"
+
+#include <string>
+#include <iostream>
+#include <sqlite3.h>
+#include <stdlib.h>
+#include <vector>
+#include <queue>
+#include <algorithm>
+
+namespace repo {
+
+using std::queue;
+
+class SqliteHandle : public StorageHandle
+{
+public:
+ class Error : public StorageHandle::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : StorageHandle::Error(what)
+ {
+ }
+ };
+
+ SqliteHandle();
+
+ explicit
+ SqliteHandle(const string& dbPath);
+
+ virtual
+ ~SqliteHandle();
+
+
+ // from StorageHandle
+
+ virtual bool
+ insertData(const Data& data);
+
+ virtual bool
+ deleteData(const Name& name);
+
+ virtual bool
+ readData(const Interest& interest, Data& data);
+
+ virtual bool
+ hasName(const Name& name);
+
+ virtual bool
+ readNameAny(const Name& name, const Selectors& selectors, vector<Name>& names);
+
+ virtual size_t
+ size();
+
+private:
+ void
+ initializeRepo();
+
+ /**
+ * @brief find data with the exact name matched
+ * @param[out] data Data matching Interest.
+ * @return if no data or something is wrong, return false
+ */
+ bool
+ readData(const Name& name, Data& data);
+
+ /**
+ * @brief check whether there is one row with this parentName = parentName in database
+ * @return if no data or something is wrong, return false.
+ */
+ bool
+ hasParentName(const Name& parentName) const;
+
+ /**
+ * @brief This function is for no selector, it will reply the leftmost data
+ * @param[out] data Data matching Interest.
+ * @return if no data or something is wrong, return false.
+ */
+ bool
+ readDataPlain(const Name& name, Data& data);
+
+ /**
+ * @brief read data with this prefix or name
+ * @param name indicates name or prefix of interest
+ * @param[out] names is vector to contain the result of this function.
+ * @return success return true, error return false
+ */
+ bool
+ readDataName(const Name& name, vector<Name>& names) const;
+
+ /**
+ * @brief read data with this prefix or name and selectors including MinSuffixComponents,
+ * MaxSuffixComponents, PublisherPublicKeyLocator, and Exclude.
+ * This method does not consider ChildSelector and MustBeFresh.
+ *
+ * @param name indicates name or prefix of interest
+ * @param[out] names is vector to contain the result of this function.
+ * @return success return true, error return false
+ */
+ bool
+ readNameSelector(const Interest& interest, vector<Name>& names) const;
+
+ /**
+ * @brief ChildSelector filter
+ * @param names list of candidate names for ChildSelector filter
+ * @param[out] resultName is the result of selected name
+ * @return success return true, error return false
+ */
+ bool
+ filterNameChild(const Name& name, int childSelector,
+ const vector<Name>& names, Name& resultName);
+
+private:
+ sqlite3* m_db;
+ string m_dbPath;
+};
+
+} // namespace repo
+
+#endif // REPO_STORAGE_SQLITE_HANDLE_HPP
diff --git a/src/storage/storage-handle.hpp b/src/storage/storage-handle.hpp
new file mode 100644
index 0000000..b75de2c
--- /dev/null
+++ b/src/storage/storage-handle.hpp
@@ -0,0 +1,126 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_STORAGE_STORAGE_HANDLE_HPP
+#define REPO_STORAGE_STORAGE_HANDLE_HPP
+
+#include "common.hpp"
+#include "storage-method.hpp"
+
+namespace repo {
+
+/**
+ * @brief this class defines handles to read, insert and delete data packets in storage media
+ */
+
+class StorageHandle : noncopyable
+{
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+ /**
+ * @brief Create a basic class object the specified storage type
+ */
+ explicit
+ StorageHandle(StorageMethod storageMethod);
+
+ virtual
+ ~StorageHandle();
+
+ /**
+ * @return storage method defined in storage-define.hpp
+ */
+ StorageMethod
+ getStorageMethod() const;
+
+ /**
+ * @brief insert data
+ * @return true on success, false otherwise
+ */
+ virtual bool
+ insertData(const Data& data) = 0;
+
+ /**
+ * @brief delete the data that exactly matches the name
+ * @return true on success, false otherwise
+ * @note It's considered successful if Data doesn't exist.
+ */
+ virtual bool
+ deleteData(const Name& name) = 0;
+
+ /**
+ * @brief find data according to the interest. This interest may contain selectors.
+ * @param[out] data Data matching Interest.
+ * @return true if Data is found, false otherwise
+ */
+ virtual bool
+ readData(const Interest& interest, Data& data) = 0;
+
+ /**
+ * @return if storage media has data packets with this name, return true, else return false
+ */
+ virtual bool
+ hasName(const Name& name) = 0;
+
+ /**
+ * @brief select any data conforms to the selector
+ * @param[out] names Data names matching @p name and @p selectors.
+ * @return true if at least one Data is found, false otherwise
+ */
+ virtual bool
+ readNameAny(const Name& name, const Selectors& selectors, vector<Name>& names) = 0;
+
+ /**
+ * @brief Get the number of Data packets stored
+ */
+ virtual size_t
+ size() = 0;
+
+private:
+ StorageMethod m_storageMethod;
+};
+
+inline
+StorageHandle::StorageHandle(StorageMethod storageMethod)
+ : m_storageMethod(storageMethod)
+{
+}
+
+inline
+StorageHandle::~StorageHandle()
+{
+}
+
+inline StorageMethod
+StorageHandle::getStorageMethod() const
+{
+ return m_storageMethod;
+}
+
+} // namespace repo
+
+#endif // REPO_STORAGE_STORAGE_HANDLE_HPP
diff --git a/src/storage/storage-method.hpp b/src/storage/storage-method.hpp
new file mode 100644
index 0000000..219be7e
--- /dev/null
+++ b/src/storage/storage-method.hpp
@@ -0,0 +1,31 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_STORAGE_STORAGE_METHOD_HPP
+#define REPO_STORAGE_STORAGE_METHOD_HPP
+
+namespace repo {
+
+enum StorageMethod {
+ STORAGE_METHOD_SQLITE = 1
+};
+
+} // namespace repo
+
+#endif // REPO_STORAGE_STORAGE_METHOD_HPP