Remove dependency on Selectors and refactor codebase.
Change-Id: Ic3024b76ba0eea61f790c91c36090b4aa68702a3
Refs: #4522
diff --git a/src/common.hpp b/src/common.hpp
index bea3ddb..1a862d0 100644
--- a/src/common.hpp
+++ b/src/common.hpp
@@ -22,12 +22,11 @@
#include "config.hpp"
+#include <ndn-cxx/data.hpp>
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/interest.hpp>
-#include <ndn-cxx/name.hpp>
-#include <ndn-cxx/data.hpp>
-#include <ndn-cxx/selectors.hpp>
#include <ndn-cxx/key-locator.hpp>
+#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/v2/key-chain.hpp>
#include <ndn-cxx/security/v2/validator.hpp>
#include <ndn-cxx/security/validator-config.hpp>
@@ -55,11 +54,10 @@
using ndn::Face;
using ndn::Block;
+using ndn::operator ""_block;
using ndn::Name;
namespace name = ndn::name;
using ndn::Interest;
-using ndn::Selectors;
-using ndn::Exclude;
using ndn::Data;
using ndn::KeyLocator;
using ndn::Scheduler;
@@ -68,12 +66,6 @@
using ndn::security::v2::ValidationError;
using ndn::security::ValidatorConfig;
-using std::shared_ptr;
-using std::make_shared;
-using std::bind;
-using std::placeholders::_1;
-using std::placeholders::_2;
-
using boost::noncopyable;
typedef uint64_t ProcessId;
diff --git a/src/extended-error-message.hpp b/src/extended-error-message.hpp
new file mode 100644
index 0000000..217bf81
--- /dev/null
+++ b/src/extended-error-message.hpp
@@ -0,0 +1,51 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_EXTENDED_ERROR_MESSAGE_HPP
+#define REPO_EXTENDED_ERROR_MESSAGE_HPP
+
+#include <boost/exception/get_error_info.hpp>
+#include <sstream>
+
+namespace repo {
+
+template<typename E>
+std::string
+getExtendedErrorMessage(const E& exception)
+{
+ std::ostringstream errorMessage;
+ errorMessage << exception.what();
+
+ const char* const* file = boost::get_error_info<boost::throw_file>(exception);
+ const int* line = boost::get_error_info<boost::throw_line>(exception);
+ const char* const* func = boost::get_error_info<boost::throw_function>(exception);
+ if (file && line) {
+ errorMessage << " [from " << *file << ":" << *line;
+ if (func) {
+ errorMessage << " in " << *func;
+ }
+ errorMessage << "]";
+ }
+
+ return errorMessage.str();
+}
+
+} // namespace repo
+
+#endif // REPO_EXTENDED_ERROR_MESSAGE_HPP
\ No newline at end of file
diff --git a/src/handles/command-base-handle.cpp b/src/handles/command-base-handle.cpp
index 1717839..8f2c6d0 100644
--- a/src/handles/command-base-handle.cpp
+++ b/src/handles/command-base-handle.cpp
@@ -32,7 +32,7 @@
static ndn::optional<std::string>
getSignerFromTag(const ndn::Interest& interest)
{
- shared_ptr<SignerTag> signerTag = interest.getTag<SignerTag>();
+ std::shared_ptr<SignerTag> signerTag = interest.getTag<SignerTag>();
if (signerTag == nullptr) {
return ndn::nullopt;
}
@@ -62,13 +62,10 @@
auto signer1 = getSignerFromTag(request);
std::string signer = signer1.value_or("*");
- //_LOG_DEBUG("accept " << request->getName() << " signer=" << signer);
accept(signer);
},
[reject] (const ndn::Interest& request,
const ndn::security::v2::ValidationError& error) {
- //_LOG_DEBUG("reject " << request->getName() << " signer=" <<
- // getSignerFromTag(*request).value_or("?") << ' ' << failureInfo);
reject(ndn::mgmt::RejectReply::STATUS403);
});
};
diff --git a/src/handles/delete-handle.cpp b/src/handles/delete-handle.cpp
index dc0f5f1..c892583 100644
--- a/src/handles/delete-handle.cpp
+++ b/src/handles/delete-handle.cpp
@@ -19,8 +19,12 @@
#include "delete-handle.hpp"
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
+NDN_LOG_INIT(repo.DeleteHandle);
+
DeleteHandle::DeleteHandle(Face& face, RepoStorage& storageHandle,
ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler,
Validator& validator)
@@ -40,12 +44,6 @@
{
const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
- if (repoParameter.hasSelectors()) {
- //choose data with selector and delete it
- processSelectorDeleteCommand(interest, repoParameter, done);
- return;
- }
-
if (!repoParameter.hasStartBlockId() && !repoParameter.hasEndBlockId()) {
processSingleDeleteCommand(interest, repoParameter, done);
return;
@@ -87,7 +85,7 @@
{
int64_t nDeletedData = storageHandle.deleteData(parameter.getName());
if (nDeletedData == -1) {
- std::cerr << "Deletion Failed!" <<std::endl;
+ NDN_LOG_DEBUG("Deletion Failed");
done(negativeReply(interest, 405, "Deletion Failed"));
}
else
@@ -95,20 +93,6 @@
}
void
-DeleteHandle::processSelectorDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
- const ndn::mgmt::CommandContinuation& done) const
-{
- int64_t nDeletedData = storageHandle.deleteData(Interest(parameter.getName())
- .setSelectors(parameter.getSelectors()));
- if (nDeletedData == -1) {
- std::cerr << "Deletion Failed!" <<std::endl;
- done(negativeReply(interest, 405, "Deletion Failed"));
- }
- else
- done(positiveReply(interest, parameter, 200, nDeletedData));
-}
-
-void
DeleteHandle::processSegmentDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
const ndn::mgmt::CommandContinuation& done) const
{
diff --git a/src/handles/delete-handle.hpp b/src/handles/delete-handle.hpp
index fbb25d3..ca32709 100644
--- a/src/handles/delete-handle.hpp
+++ b/src/handles/delete-handle.hpp
@@ -62,10 +62,6 @@
const ndn::mgmt::CommandContinuation& done) const;
void
- processSelectorDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
- const ndn::mgmt::CommandContinuation& done) const;
-
- void
processSegmentDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
const ndn::mgmt::CommandContinuation& done) const;
};
diff --git a/src/handles/read-handle.cpp b/src/handles/read-handle.cpp
index 44a9d18..dbdfef7 100644
--- a/src/handles/read-handle.cpp
+++ b/src/handles/read-handle.cpp
@@ -20,8 +20,12 @@
#include "read-handle.hpp"
#include "repo.hpp"
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
+NDN_LOG_INIT(repo.ReadHandle);
+
ReadHandle::ReadHandle(Face& face, RepoStorage& storageHandle, size_t prefixSubsetLength)
: m_prefixSubsetLength(prefixSubsetLength)
, m_face(face)
@@ -35,11 +39,11 @@
{
// Connect a RepoStorage's signals to the read handle
if (m_prefixSubsetLength != RepoConfig::DISABLED_SUBSET_LENGTH) {
- afterDataDeletionConnection = m_storageHandle.afterDataInsertion.connect(
+ afterDataInsertionConnection = m_storageHandle.afterDataInsertion.connect(
[this] (const Name& prefix) {
onDataInserted(prefix);
});
- afterDataInsertionConnection = m_storageHandle.afterDataDeletion.connect(
+ afterDataDeletionConnection = m_storageHandle.afterDataDeletion.connect(
[this] (const Name& prefix) {
onDataDeleted(prefix);
});
@@ -49,16 +53,21 @@
void
ReadHandle::onInterest(const Name& prefix, const Interest& interest)
{
- shared_ptr<ndn::Data> data = m_storageHandle.readData(interest);
+ NDN_LOG_DEBUG("Received Interest " << interest.getName());
+ std::shared_ptr<ndn::Data> data = m_storageHandle.readData(interest);
if (data != nullptr) {
- m_face.put(*data);
+ NDN_LOG_DEBUG("Put Data: " << *data);
+ m_face.put(*data);
+ }
+ else {
+ NDN_LOG_DEBUG("No data for " << interest.getName());
}
}
void
ReadHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
{
- std::cerr << "ERROR: Failed to register prefix in local hub's daemon" << std::endl;
+ NDN_LOG_ERROR("ERROR: Failed to register prefix in local hub's daemon");
m_face.shutdown();
}
@@ -67,8 +76,8 @@
{
ndn::InterestFilter filter(prefix);
m_face.setInterestFilter(filter,
- bind(&ReadHandle::onInterest, this, _1, _2),
- bind(&ReadHandle::onRegisterFailed, this, _1, _2));
+ std::bind(&ReadHandle::onInterest, this, _1, _2),
+ std::bind(&ReadHandle::onRegisterFailed, this, _1, _2));
}
void
diff --git a/src/handles/read-handle.hpp b/src/handles/read-handle.hpp
index a243abd..bdd6b6f 100644
--- a/src/handles/read-handle.hpp
+++ b/src/handles/read-handle.hpp
@@ -54,7 +54,7 @@
}
/**
- * @param after Do something after actually removing a prefix
+ * @param name Full name of the deleted Data
*/
void
onDataDeleted(const Name& name);
diff --git a/src/handles/tcp-bulk-insert-handle.cpp b/src/handles/tcp-bulk-insert-handle.cpp
index 53a60d6..f27d173 100644
--- a/src/handles/tcp-bulk-insert-handle.cpp
+++ b/src/handles/tcp-bulk-insert-handle.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -19,8 +19,12 @@
#include "tcp-bulk-insert-handle.hpp"
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
+NDN_LOG_INIT(repo.tcpHandle);
+
const size_t MAX_NDN_PACKET_SIZE = 8800;
namespace detail {
@@ -29,7 +33,7 @@
{
public:
TcpBulkInsertClient(TcpBulkInsertHandle& writer,
- const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
: m_writer(writer)
, m_socket(socket)
, m_hasStarted(false)
@@ -38,13 +42,13 @@
}
static void
- startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+ startReceive(const std::shared_ptr<TcpBulkInsertClient>& client)
{
BOOST_ASSERT(!client->m_hasStarted);
client->m_socket->async_receive(
boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+ std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
client->m_hasStarted = true;
}
@@ -53,11 +57,11 @@
void
handleReceive(const boost::system::error_code& error,
std::size_t nBytesReceived,
- const shared_ptr<TcpBulkInsertClient>& client);
+ const std::shared_ptr<TcpBulkInsertClient>& client);
private:
TcpBulkInsertHandle& m_writer;
- shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+ std::shared_ptr<boost::asio::ip::tcp::socket> m_socket;
bool m_hasStarted;
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
std::size_t m_inputBufferSize;
@@ -87,22 +91,19 @@
BOOST_THROW_EXCEPTION(Error("Cannot listen on [" + host + ":" + port + "]"));
m_localEndpoint = *endpoint;
- std::cerr << "Start listening on " << m_localEndpoint << std::endl;
+ NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
m_acceptor.open(m_localEndpoint .protocol());
m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
- if (m_localEndpoint.address().is_v6())
- {
- m_acceptor.set_option(ip::v6_only(true));
- }
+ if (m_localEndpoint.address().is_v6()) {
+ m_acceptor.set_option(ip::v6_only(true));
+ }
m_acceptor.bind(m_localEndpoint);
m_acceptor.listen(255);
- shared_ptr<ip::tcp::socket> clientSocket =
- make_shared<ip::tcp::socket>(std::ref(m_acceptor.get_io_service()));
+ auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
m_acceptor.async_accept(*clientSocket,
- bind(&TcpBulkInsertHandle::handleAccept, this, _1,
- clientSocket));
+ std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
}
void
@@ -114,45 +115,40 @@
void
TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
- const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
{
using namespace boost::asio;
if (error) {
- // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- // return;
return;
}
- std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+ NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
- shared_ptr<detail::TcpBulkInsertClient> client =
- make_shared<detail::TcpBulkInsertClient>(std::ref(*this), socket);
+ std::shared_ptr<detail::TcpBulkInsertClient> client =
+ std::make_shared<detail::TcpBulkInsertClient>(*this, socket);
detail::TcpBulkInsertClient::startReceive(client);
// prepare accepting the next connection
- shared_ptr<ip::tcp::socket> clientSocket =
- make_shared<ip::tcp::socket>(std::ref(m_acceptor.get_io_service()));
+ auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
m_acceptor.async_accept(*clientSocket,
- bind(&TcpBulkInsertHandle::handleAccept, this, _1,
- clientSocket));
+ std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
}
void
detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
std::size_t nBytesReceived,
- const shared_ptr<detail::TcpBulkInsertClient>& client)
+ const std::shared_ptr<detail::TcpBulkInsertClient>& client)
{
- if (error)
- {
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- return;
-
- boost::system::error_code error;
- m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
- m_socket->close(error);
+ if (error) {
+ if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- }
+
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
m_inputBufferSize += nBytesReceived;
@@ -175,42 +171,37 @@
Data data(element);
bool isInserted = m_writer.getStorageHandle().insertData(data);
if (isInserted)
- std::cerr << "Successfully injected " << data.getName() << std::endl;
+ NDN_LOG_DEBUG("Successfully injected " << data.getName());
else
- std::cerr << "FAILED to inject " << data.getName() << std::endl;
+ NDN_LOG_DEBUG("FAILED to inject " << data.getName());
}
catch (const std::runtime_error& error) {
/// \todo Catch specific error after determining what wireDecode() can throw
- std::cerr << "Error decoding received Data packet" << std::endl;
+ NDN_LOG_ERROR("Error decoding received Data packet");
}
}
}
- if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
- {
- boost::system::error_code error;
- m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
- m_socket->close(error);
- return;
- }
+ if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
- if (offset > 0)
- {
- if (offset != m_inputBufferSize)
- {
- std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
- m_inputBuffer);
- m_inputBufferSize -= offset;
- }
- else
- {
- m_inputBufferSize = 0;
- }
+ if (offset > 0) {
+ if (offset != m_inputBufferSize) {
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
+ m_inputBufferSize -= offset;
}
+ else {
+ m_inputBufferSize = 0;
+ }
+ }
m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
- bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+ std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
}
diff --git a/src/handles/watch-handle.cpp b/src/handles/watch-handle.cpp
deleted file mode 100644
index 105e802..0000000
--- a/src/handles/watch-handle.cpp
+++ /dev/null
@@ -1,298 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2014-2018, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "watch-handle.hpp"
-
-namespace repo {
-
-static const milliseconds PROCESS_DELETE_TIME(10000_ms);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
-
-WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle,
- ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler, Validator& validator)
- : CommandBaseHandle(face, storageHandle, scheduler, validator)
- , m_validator(validator)
- , m_interestNum(0)
- , m_maxInterestNum(0)
- , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
- , m_watchTimeout(0)
- , m_startTime(steady_clock::now())
- , m_size(0)
-{
- dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("start"),
- makeAuthorization(),
- std::bind(&WatchHandle::validateParameters<WatchStartCommand>, this, _1),
- std::bind(&WatchHandle::handleStartCommand, this, _1, _2, _3, _4));
-
- dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("check"),
- makeAuthorization(),
- std::bind(&WatchHandle::validateParameters<WatchCheckCommand>, this, _1),
- std::bind(&WatchHandle::handleCheckCommand, this, _1, _2, _3, _4));
-
- dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("stop"),
- makeAuthorization(),
- std::bind(&WatchHandle::validateParameters<WatchStopCommand>, this, _1),
- std::bind(&WatchHandle::handleStopCommand, this, _1, _2, _3, _4));
-}
-
-void
-WatchHandle::deleteProcess(const Name& name)
-{
- m_processes.erase(name);
-}
-
-void
-WatchHandle::handleStartCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
- processWatchCommand(interest, repoParameter, done);
-}
-
-void WatchHandle::watchStop(const Name& name)
-{
- m_processes[name].second = false;
- m_maxInterestNum = 0;
- m_interestNum = 0;
- m_startTime = steady_clock::now();
- m_watchTimeout = 0_ms;
- m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
- m_size = 0;
-}
-
-
-void
-WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
-{
- m_validator.validate(data,
- bind(&WatchHandle::onDataValidated, this, interest, _1, name),
- bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
-}
-
-void
-WatchHandle::onDataValidated(const Interest& interest, const Data& data, const Name& name)
-{
- if (!m_processes[name].second) {
- return;
- }
- if (storageHandle.insertData(data)) {
- m_size++;
- if (!onRunning(name))
- return;
-
- Interest fetchInterest(interest.getName());
- fetchInterest.setSelectors(interest.getSelectors());
- fetchInterest.setInterestLifetime(m_interestLifetime);
- fetchInterest.setChildSelector(1);
-
- // update selectors
- // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
- if (data.getName().size() == interest.getName().size()) {
- fetchInterest.setMinSuffixComponents(2);
- }
- else {
- Exclude exclude;
- if (!interest.getExclude().empty()) {
- exclude = interest.getExclude();
- }
-
- exclude.excludeBefore(data.getName()[interest.getName().size()]);
- fetchInterest.setExclude(exclude);
- }
-
- ++m_interestNum;
- face.expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, name),
- bind(&WatchHandle::onTimeout, this, _1, name), // Nack
- bind(&WatchHandle::onTimeout, this, _1, name));
- }
- else {
- BOOST_THROW_EXCEPTION(Error("Insert into Repo Failed"));
- }
- m_processes[name].first.setInsertNum(m_size);
-}
-
-void
-WatchHandle::onDataValidationFailed(const Interest& interest, const Data& data,
- const ValidationError& error, const Name& name)
-{
- std::cerr << error << std::endl;
- if (!m_processes[name].second) {
- return;
- }
- if (!onRunning(name))
- return;
-
- Interest fetchInterest(interest.getName());
- fetchInterest.setSelectors(interest.getSelectors());
- fetchInterest.setInterestLifetime(m_interestLifetime);
- fetchInterest.setChildSelector(1);
-
- // update selectors
- // if data name is equal to interest name, use MinSuffixComponents selecor to exclude this data
- if (data.getName().size() == interest.getName().size()) {
- fetchInterest.setMinSuffixComponents(2);
- }
- else {
- Exclude exclude;
- if (!interest.getExclude().empty()) {
- exclude = interest.getExclude();
- }
- // Only exclude this data since other data whose names are smaller may be validated and satisfied
- exclude.excludeBefore(data.getName()[interest.getName().size()]);
- fetchInterest.setExclude(exclude);
- }
-
- ++m_interestNum;
- face.expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, name),
- bind(&WatchHandle::onTimeout, this, _1, name), // Nack
- bind(&WatchHandle::onTimeout, this, _1, name));
-}
-
-void
-WatchHandle::onTimeout(const ndn::Interest& interest, const Name& name)
-{
- std::cerr << "Timeout" << std::endl;
- if (!m_processes[name].second) {
- return;
- }
- if (!onRunning(name))
- return;
- // selectors do not need to be updated
- Interest fetchInterest(interest.getName());
- fetchInterest.setSelectors(interest.getSelectors());
- fetchInterest.setInterestLifetime(m_interestLifetime);
- fetchInterest.setChildSelector(1);
-
- ++m_interestNum;
- face.expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, name),
- bind(&WatchHandle::onTimeout, this, _1, name), // Nack
- bind(&WatchHandle::onTimeout, this, _1, name));
-
-}
-
-void
-WatchHandle::handleStopCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-
- watchStop(repoParameter.getName());
- std::string text = "Watched Prefix Insertion for prefix (" + prefix.toUri() + ") is stop.";
- return done(RepoCommandResponse(101, text));
-}
-
-void
-WatchHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-
- //check whether this process exists
- Name name = repoParameter.getName();
- if (m_processes.count(name) == 0) {
- std::cerr << "no such process name: " << name << std::endl;
- RepoCommandResponse response(404, "No such process is in progress");
- response.setBody(response.wireEncode());
- return done(response);
- }
-
- RepoCommandResponse& response = m_processes[name].first;
-
- if (!m_processes[name].second) {
- response.setCode(101);
- }
-
- return done(response);
-}
-
-void
-WatchHandle::deferredDeleteProcess(const Name& name)
-{
- scheduler.scheduleEvent(PROCESS_DELETE_TIME,
- bind(&WatchHandle::deleteProcess, this, name));
-}
-
-void
-WatchHandle::processWatchCommand(const Interest& interest,
- const RepoCommandParameter& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- // if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
- if (parameter.hasWatchTimeout()) {
- m_watchTimeout = parameter.getWatchTimeout();
- }
- else {
- m_watchTimeout = 0_ms;
- }
-
- // if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
- if (parameter.hasMaxInterestNum()) {
- m_maxInterestNum = parameter.getMaxInterestNum();
- }
- else {
- m_maxInterestNum = 0;
- }
-
- if (parameter.hasInterestLifetime()) {
- m_interestLifetime = parameter.getInterestLifetime();
- }
-
- RepoCommandResponse response(100, "Watching the prefix started.");
- response.setBody(response.wireEncode());
- done(response);
-
- m_processes[parameter.getName()] =
- std::make_pair(RepoCommandResponse(300, "This watched prefix Insertion is in progress"),
- true);
-
- Interest fetchInterest(parameter.getName());
- if (parameter.hasSelectors()) {
- fetchInterest.setSelectors(parameter.getSelectors());
- }
- fetchInterest.setChildSelector(1);
- fetchInterest.setInterestLifetime(m_interestLifetime);
- m_startTime = steady_clock::now();
- m_interestNum++;
- face.expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
-}
-
-bool
-WatchHandle::onRunning(const Name& name)
-{
- bool isTimeout = (m_watchTimeout != milliseconds::zero() &&
- steady_clock::now() - m_startTime > m_watchTimeout);
- bool isMaxInterest = m_interestNum >= m_maxInterestNum && m_maxInterestNum != 0;
- if (isTimeout || isMaxInterest) {
- deferredDeleteProcess(name);
- watchStop(name);
- return false;
- }
- return true;
-}
-
-} // namespace repo
diff --git a/src/handles/watch-handle.hpp b/src/handles/watch-handle.hpp
deleted file mode 100644
index 221b67d..0000000
--- a/src/handles/watch-handle.hpp
+++ /dev/null
@@ -1,155 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2014-2018, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef REPO_HANDLES_WATCH_HANDLE_HPP
-#define REPO_HANDLES_WATCH_HANDLE_HPP
-
-#include "command-base-handle.hpp"
-
-#include <ndn-cxx/mgmt/dispatcher.hpp>
-
-#include <queue>
-
-namespace repo {
-
-using std::map;
-using std::pair;
-using std::queue;
-using namespace ndn::time;
-/**
- * @brief WatchHandle provides a different way for repo to insert data.
- *
- * Repo keeps sending interest to request the data with same prefix,
- * but with different exclude selectors(updated every time). Repo will stop
- * watching the prefix until a command interest tell it to stop, the total
- * amount of sent interests reaches a specific number or time out.
- */
-class WatchHandle : public CommandBaseHandle
-{
-
-public:
- class Error : public CommandBaseHandle::Error
- {
- public:
- explicit
- Error(const std::string& what)
- : CommandBaseHandle::Error(what)
- {
- }
- };
-
-
-public:
- WatchHandle(Face& face, RepoStorage& storageHandle,
- ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler,
- Validator& validator);
-
-private: // watch-insert command
- /**
- * @brief handle watch commands
- */
-
- void
- handleStartCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameters,
- const ndn::mgmt::CommandContinuation& done);
- void
- onValidationFailed(const std::shared_ptr<const Interest>& interest, const std::string& reason);
-
-private: // data fetching
- /**
- * @brief fetch data and send next interest
- */
- void
- onData(const Interest& interest, const Data& data, const Name& name);
-
- /**
- * @brief handle when fetching one data timeout
- */
- void
- onTimeout(const Interest& interest, const Name& name);
-
- void
- onDataValidated(const Interest& interest, const Data& data, const Name& name);
-
- /**
- * @brief failure of validation
- */
- void
- onDataValidationFailed(const Interest& interest, const Data& data,
- const ValidationError& error, const Name& name);
-
-
- void
- processWatchCommand(const Interest& interest, const RepoCommandParameter& parameter,
- const ndn::mgmt::CommandContinuation& done);
-
- void
- watchStop(const Name& name);
-
-private: // watch state check command
- /**
- * @brief handle watch check command
- */
-
- void
- handleCheckCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameters,
- const ndn::mgmt::CommandContinuation& done);
-
- void
- onCheckValidationFailed(const Interest& interest, const ValidationError& error);
-
-private: // watch stop command
- /**
- * @brief handle watch stop command
- */
-
- void
- handleStopCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameters,
- const ndn::mgmt::CommandContinuation& done);
-
- void
- onStopValidationFailed(const Interest& interest, const ValidationError& error);
-
-private:
- void
- deferredDeleteProcess(const Name& name);
-
- void
- deleteProcess(const Name& name);
-
- bool
- onRunning(const Name& name);
-
-private:
- Validator& m_validator;
- map<Name, std::pair<RepoCommandResponse, bool> > m_processes;
- int64_t m_interestNum;
- int64_t m_maxInterestNum;
- milliseconds m_interestLifetime;
- milliseconds m_watchTimeout;
- ndn::time::steady_clock::TimePoint m_startTime;
- int64_t m_size;
-};
-
-} // namespace repo
-
-#endif // REPO_HANDLES_WATCH_HANDLE_HPP
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
index 6cc9583..675f286 100644
--- a/src/handles/write-handle.cpp
+++ b/src/handles/write-handle.cpp
@@ -19,12 +19,16 @@
#include "write-handle.hpp"
+#include <ndn-cxx/util/logger.hpp>
#include <ndn-cxx/util/random.hpp>
namespace repo {
-static const int RETRY_TIMEOUT = 3;
+NDN_LOG_INIT(repo.WriteHandle);
+
static const int DEFAULT_CREDIT = 12;
+static const bool DEFAULT_CANBE_PREFIX = false;
+static const milliseconds MAX_TIMEOUT(60_s);
static const milliseconds NOEND_TIMEOUT(10000_ms);
static const milliseconds PROCESS_DELETE_TIME(10000_ms);
static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
@@ -33,8 +37,9 @@
Scheduler& scheduler, Validator& validator)
: CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
- , m_retryTime(RETRY_TIMEOUT)
, m_credit(DEFAULT_CREDIT)
+ , m_canBePrefix(DEFAULT_CANBE_PREFIX)
+ , m_maxTimeout(MAX_TIMEOUT)
, m_noEndTimeout(NOEND_TIMEOUT)
, m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
{
@@ -64,10 +69,6 @@
dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(¶meter));
if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
- if (repoParameter->hasSelectors()) {
- done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
- return;
- }
processSegmentedInsertCommand(interest, *repoParameter, done);
}
else {
@@ -81,8 +82,8 @@
WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
{
m_validator.validate(data,
- bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
- bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
+ std::bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
+ [](const Data& data, const ValidationError& error){NDN_LOG_ERROR("Error: " << error);});
}
void
@@ -104,296 +105,13 @@
}
void
-WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error)
-{
- std::cerr << error << std::endl;
-}
-
-void
-WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
-{
- m_validator.validate(data,
- bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
- bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
-}
-
-void
-WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId)
-{
- auto it = m_processes.find(processId);
- if (it == m_processes.end()) {
- return;
- }
- RepoCommandResponse& response = it->second.response;
-
- //refresh endBlockId
- auto finalBlock = data.getFinalBlock();
- if (finalBlock && finalBlock->isSegment()) {
- auto finalSeg = finalBlock->toSegment();
- if (response.hasEndBlockId()) {
- if (finalSeg < response.getEndBlockId()) {
- response.setEndBlockId(finalSeg);
- }
- }
- else {
- response.setEndBlockId(finalSeg);
- }
- }
-
- //insert data
- if (storageHandle.insertData(data)) {
- response.setInsertNum(response.getInsertNum() + 1);
- }
-
- onSegmentDataControl(processId, interest);
-}
-
-void
WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
{
- std::cerr << "Timeout" << std::endl;
+ NDN_LOG_DEBUG("Timeout" << std::endl);
m_processes.erase(processId);
}
void
-WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
-{
- std::cerr << "SegTimeout" << std::endl;
-
- onSegmentTimeoutControl(processId, interest);
-}
-
-void
-WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
-{
- ProcessInfo& process = m_processes[processId];
- process.credit = 0;
-
- map<SegmentNo, int>& processRetry = process.retryCounts;
-
- Name name = parameter.getName();
- SegmentNo startBlockId = parameter.getStartBlockId();
-
- uint64_t initialCredit = m_credit;
-
- if (parameter.hasEndBlockId()) {
- initialCredit =
- std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
- }
- else {
- // set noEndTimeout timer
- process.noEndTime = ndn::time::steady_clock::now() +
- m_noEndTimeout;
- }
- process.credit = initialCredit;
- SegmentNo segment = startBlockId;
-
- for (; segment < startBlockId + initialCredit; ++segment) {
- Name fetchName = name;
- fetchName.appendSegment(segment);
- Interest interest(fetchName);
- interest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(interest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- process.credit--;
- processRetry[segment] = 0;
- }
-
- queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
-
- process.nextSegment = segment;
- nextSegmentQueue.push(segment);
-}
-
-void
-WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
-{
- if (m_processes.count(processId) == 0) {
- return;
- }
- ProcessInfo& process = m_processes[processId];
- RepoCommandResponse& response = process.response;
- int& processCredit = process.credit;
- //onSegmentDataControl is called when a data returns.
- //When data returns, processCredit++
- processCredit++;
- SegmentNo& nextSegment = process.nextSegment;
- queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
- map<SegmentNo, int>& retryCounts = process.retryCounts;
-
- //read whether notime timeout
- if (!response.hasEndBlockId()) {
-
- ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
- ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
-
- if (now > noEndTime) {
- std::cerr << "noEndtimeout: " << processId << std::endl;
- //m_processes.erase(processId);
- //StatusCode should be refreshed as 405
- response.setCode(405);
- //schedule a delete event
- deferredDeleteProcess(processId);
- return;
- }
- }
-
- //read whether this process has total ends, if ends, remove control info from the maps
- if (response.hasEndBlockId()) {
- uint64_t nSegments =
- response.getEndBlockId() - response.getStartBlockId() + 1;
- if (response.getInsertNum() >= nSegments) {
- //m_processes.erase(processId);
- //All the data has been inserted, StatusCode is refreshed as 200
- response.setCode(200);
- deferredDeleteProcess(processId);
- return;
- }
- }
-
- //check whether there is any credit
- if (processCredit == 0)
- return;
-
-
- //check whether sent queue empty
- if (nextSegmentQueue.empty()) {
- //do not do anything
- return;
- }
-
- //pop the queue
- SegmentNo sendingSegment = nextSegmentQueue.front();
- nextSegmentQueue.pop();
-
- //check whether sendingSegment exceeds
- if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
- //do not do anything
- return;
- }
-
- //read whether this is retransmitted data;
- SegmentNo fetchedSegment =
- interest.getName().get(interest.getName().size() - 1).toSegment();
-
- BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
-
- //find this fetched data, remove it from this map
- //rit->second.erase(oit);
- retryCounts.erase(fetchedSegment);
- //express the interest of the top of the queue
- Name fetchName(interest.getName().getPrefix(-1));
- fetchName.appendSegment(sendingSegment);
- Interest fetchInterest(fetchName);
- fetchInterest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(fetchInterest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- //When an interest is expressed, processCredit--
- processCredit--;
- if (retryCounts.count(sendingSegment) == 0) {
- //not found
- retryCounts[sendingSegment] = 0;
- }
- else {
- //found
- retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
- }
- //increase the next seg and put it into the queue
- if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
- nextSegment++;
- nextSegmentQueue.push(nextSegment);
- }
-}
-
-void
-WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
-{
- if (m_processes.count(processId) == 0) {
- return;
- }
- ProcessInfo& process = m_processes[processId];
- // RepoCommandResponse& response = process.response;
- // SegmentNo& nextSegment = process.nextSegment;
- // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
- map<SegmentNo, int>& retryCounts = process.retryCounts;
-
- SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
-
- std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
-
- BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
-
- //read the retry time. If retry out of time, fail the process. if not, plus
- int& retryTime = retryCounts[timeoutSegment];
- if (retryTime >= m_retryTime) {
- //fail this process
- std::cerr << "Retry timeout: " << processId << std::endl;
- m_processes.erase(processId);
- return;
- }
- else {
- //Reput it in the queue, retryTime++
- retryTime++;
- Interest retryInterest(interest.getName());
- retryInterest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(retryInterest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- }
-
-}
-
-void
-WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-
- //check whether this process exists
- ProcessId processId = repoParameter.getProcessId();
- if (m_processes.count(processId) == 0) {
- std::cerr << "no such processId: " << processId << std::endl;
- done(negativeReply("No such this process is in progress", 404));
- return;
- }
-
- ProcessInfo& process = m_processes[processId];
-
- RepoCommandResponse& response = process.response;
-
- //Check whether it is single data fetching
- if (!response.hasStartBlockId() &&
- !response.hasEndBlockId()) {
- //reply(interest, response);
- done(response);
- return;
- }
-
- //read if noEndtimeout
- if (!response.hasEndBlockId()) {
- extendNoEndTime(process);
- done(response);
- return;
- }
- else {
- done(response);
- }
-}
-
-void
-WriteHandle::deferredDeleteProcess(ProcessId processId)
-{
- scheduler.scheduleEvent(PROCESS_DELETE_TIME,
- bind(&WriteHandle::deleteProcess, this, processId));
-}
-
-void
WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
const ndn::mgmt::CommandContinuation& done)
{
@@ -409,16 +127,108 @@
done(response);
response.setCode(300);
-
Interest fetchInterest(parameter.getName());
+ fetchInterest.setCanBePrefix(m_canBePrefix);
fetchInterest.setInterestLifetime(m_interestLifetime);
- if (parameter.hasSelectors()) {
- fetchInterest.setSelectors(parameter.getSelectors());
- }
face.expressInterest(fetchInterest,
- bind(&WriteHandle::onData, this, _1, _2, processId),
- bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onTimeout, this, _1, processId));
+ std::bind(&WriteHandle::onData, this, _1, _2, processId),
+ std::bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
+ std::bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+ // use SegmentFetcher to send fetch interest.
+ ProcessInfo& process = m_processes[processId];
+ Name name = parameter.getName();
+ SegmentNo startBlockId = parameter.getStartBlockId();
+
+ uint64_t initialCredit = m_credit;
+
+ if (parameter.hasEndBlockId()) {
+ initialCredit =
+ std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
+ }
+ else {
+ // set noEndTimeout timer
+ process.noEndTime = ndn::time::steady_clock::now() +
+ m_noEndTimeout;
+ }
+
+ Name fetchName = name;
+ SegmentNo segment = startBlockId;
+ fetchName.appendSegment(segment);
+ Interest interest(fetchName);
+
+ ndn::util::SegmentFetcher::Options options;
+ options.initCwnd = initialCredit;
+ options.interestLifetime = m_interestLifetime;
+ options.maxTimeout = m_maxTimeout;
+ auto fetcher = ndn::util::SegmentFetcher::start(face, interest, m_validator, options);
+ fetcher->onError.connect([] (uint32_t errorCode, const std::string& errorMsg)
+ {NDN_LOG_ERROR("Error: " << errorMsg);});
+ fetcher->afterSegmentValidated.connect([this, &fetcher, &processId] (const Data& data)
+ {onSegmentData(*fetcher, data, processId);});
+ fetcher->afterSegmentTimedOut.connect([this, &fetcher, &processId] ()
+ {onSegmentTimeout(*fetcher, processId);});
+}
+
+void
+WriteHandle::onSegmentData(ndn::util::SegmentFetcher& fetcher, const Data& data, ProcessId processId)
+{
+ auto it = m_processes.find(processId);
+ if (it == m_processes.end()) {
+ fetcher.stop();
+ return;
+ }
+
+ RepoCommandResponse& response = it->second.response;
+
+ //insert data
+ if (storageHandle.insertData(data)) {
+ response.setInsertNum(response.getInsertNum() + 1);
+ }
+
+ ProcessInfo& process = m_processes[processId];
+ //read whether notime timeout
+ if (!response.hasEndBlockId()) {
+
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+ if (now > noEndTime) {
+ NDN_LOG_DEBUG("noEndtimeout: " << processId);
+ //StatusCode should be refreshed as 405
+ response.setCode(405);
+ //schedule a delete event
+ deferredDeleteProcess(processId);
+ fetcher.stop();
+ return;
+ }
+ }
+
+ //read whether this process has total ends, if ends, remove control info from the maps
+ if (response.hasEndBlockId()) {
+ uint64_t nSegments = response.getEndBlockId() - response.getStartBlockId() + 1;
+ if (response.getInsertNum() >= nSegments) {
+ //All the data has been inserted, StatusCode is refreshed as 200
+ response.setCode(200);
+ deferredDeleteProcess(processId);
+ fetcher.stop();
+ return;
+ }
+ }
+}
+
+void
+WriteHandle::onSegmentTimeout(ndn::util::SegmentFetcher& fetcher, ProcessId processId)
+{
+ NDN_LOG_DEBUG("SegTimeout");
+ if (m_processes.count(processId) == 0) {
+ fetcher.stop();
+ return;
+ }
}
void
@@ -470,6 +280,49 @@
}
void
+WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
+{
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+
+ //check whether this process exists
+ ProcessId processId = repoParameter.getProcessId();
+ if (m_processes.count(processId) == 0) {
+ NDN_LOG_DEBUG("no such processId: " << processId);
+ done(negativeReply("No such this process is in progress", 404));
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+
+ //Check whether it is single data fetching
+ if (!response.hasStartBlockId() && !response.hasEndBlockId()) {
+ done(response);
+ return;
+ }
+
+ //read if noEndtimeout
+ if (!response.hasEndBlockId()) {
+ extendNoEndTime(process);
+ done(response);
+ return;
+ }
+ else {
+ done(response);
+ }
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
+ std::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
WriteHandle::extendNoEndTime(ProcessInfo& process)
{
ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
diff --git a/src/handles/write-handle.hpp b/src/handles/write-handle.hpp
index 686d68c..115801e 100644
--- a/src/handles/write-handle.hpp
+++ b/src/handles/write-handle.hpp
@@ -23,15 +23,12 @@
#include "command-base-handle.hpp"
#include <ndn-cxx/mgmt/dispatcher.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
#include <queue>
namespace repo {
-using std::map;
-using std::pair;
-using std::queue;
-
/**
* @brief WriteHandle provides basic credit based congestion control.
*
@@ -80,12 +77,11 @@
*/
struct ProcessInfo
{
- //ProcessId id;
RepoCommandResponse response;
- queue<SegmentNo> nextSegmentQueue; ///< queue of waiting segment
+ std::queue<SegmentNo> nextSegmentQueue; ///< queue of waiting segment
/// to be sent when having credits
SegmentNo nextSegment; ///< last segment put into the nextSegmentQueue
- map<SegmentNo, int> retryCounts; ///< to store retrying times of timeout segment
+ std::map<SegmentNo, int> retryCounts; ///< to store retrying times of timeout segment
int credit; ///< congestion control credits of process
/**
@@ -136,16 +132,13 @@
* @brief fetch segmented data
*/
void
- onSegmentData(const Interest& interest, const Data& data, ProcessId processId);
-
- void
- onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId);
+ onSegmentData(ndn::util::SegmentFetcher& fetcher, const Data& data, ProcessId processId);
/**
- * @brief Timeout when fetching segmented data. Data can be fetched RETRY_TIMEOUT times.
+ * @brief handle when fetching segmented data timeout
*/
void
- onSegmentTimeout(const Interest& interest, ProcessId processId);
+ onSegmentTimeout(ndn::util::SegmentFetcher& fetcher, ProcessId processId);
/**
* @brief initiate fetching segmented data
@@ -153,30 +146,12 @@
void
segInit(ProcessId processId, const RepoCommandParameter& parameter);
- /**
- * @brief control for sending interests in function onSegmentData()
- */
- void
- onSegmentDataControl(ProcessId processId, const Interest& interest);
-
- /**
- * @brief control for sending interest in function onSegmentTimeout
- */
- void
- onSegmentTimeoutControl(ProcessId processId, const Interest& interest);
-
void
processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
const ndn::mgmt::CommandContinuation& done);
private:
/**
- * @brief failure of validation for both one or segmented data
- */
- void
- onDataValidationFailed(const Data& data, const ValidationError& error);
-
- /**
* @brief extends noEndTime of process if not noEndTimeout, set StatusCode 405
*
* called by onCheckValidated() if there is no EndBlockId. If not noEndTimeout,
@@ -213,9 +188,12 @@
private:
Validator& m_validator;
- map<ProcessId, ProcessInfo> m_processes;
- int m_retryTime;
+
+ std::map<ProcessId, ProcessInfo> m_processes;
+
int m_credit;
+ bool m_canBePrefix;
+ ndn::time::milliseconds m_maxTimeout;
ndn::time::milliseconds m_noEndTimeout;
ndn::time::milliseconds m_interestLifetime;
};
diff --git a/src/main.cpp b/src/main.cpp
index 9713943..800b8c7 100644
--- a/src/main.cpp
+++ b/src/main.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -20,6 +20,12 @@
#include "config.hpp"
#include "repo.hpp"
+#include "extended-error-message.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+
+NDN_LOG_INIT(repo.main);
+
static const std::string ndnRepoUsageMessage =
/* argv[0] */ " - Next generation of NDN repository\n"
"-h: show help message\n"
@@ -35,19 +41,14 @@
if (error)
return;
- if (signalNo == SIGINT ||
- signalNo == SIGTERM)
- {
- ioService.stop();
- std::cout << "Caught signal '" << strsignal(signalNo) << "', exiting..." << std::endl;
- }
- else
- {
- /// \todo May be try to reload config file
- signalSet.async_wait(std::bind(&terminate, std::ref(ioService),
- std::placeholders::_1, std::placeholders::_2,
- std::ref(signalSet)));
- }
+ if (signalNo == SIGINT || signalNo == SIGTERM) {
+ ioService.stop();
+ std::cout << "Caught signal '" << strsignal(signalNo) << "', exiting..." << std::endl;
+ }
+ else {
+ /// \todo try to reload config file
+ signalSet.async_wait(std::bind(&terminate, std::ref(ioService), _1, _2, std::ref(signalSet)));
+ }
}
int
@@ -91,7 +92,7 @@
ioService.run();
}
catch (const std::exception& e) {
- std::cerr << "ERROR: " << e.what() << std::endl;
+ NDN_LOG_FATAL(repo::getExtendedErrorMessage(e));
return 2;
}
diff --git a/src/repo-command-parameter.cpp b/src/repo-command-parameter.cpp
index e7f7e15..2f4de5c 100644
--- a/src/repo-command-parameter.cpp
+++ b/src/repo-command-parameter.cpp
@@ -21,9 +21,8 @@
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/encoding/block-helpers.hpp>
-#include <ndn-cxx/name.hpp>
-#include <ndn-cxx/selectors.hpp>
#include <ndn-cxx/mgmt/control-parameters.hpp>
+#include <ndn-cxx/name.hpp>
namespace repo {
@@ -37,14 +36,6 @@
}
RepoCommandParameter&
-RepoCommandParameter::setSelectors(const Selectors& selectors)
-{
- m_selectors = selectors;
- m_wire.reset();
- return *this;
-}
-
-RepoCommandParameter&
RepoCommandParameter::setStartBlockId(uint64_t startBlockId)
{
m_startBlockId = startBlockId;
@@ -72,24 +63,6 @@
}
RepoCommandParameter&
-RepoCommandParameter::setMaxInterestNum(uint64_t maxInterestNum)
-{
- m_maxInterestNum = maxInterestNum;
- m_hasFields[REPO_PARAMETER_MAX_INTEREST_NUM] = true;
- m_wire.reset();
- return *this;
-}
-
-RepoCommandParameter&
-RepoCommandParameter::setWatchTimeout(milliseconds watchTimeout)
-{
- m_watchTimeout = watchTimeout;
- m_hasFields[REPO_PARAMETER_WATCH_TIME_OUT] = true;
- m_wire.reset();
- return *this;
-}
-
-RepoCommandParameter&
RepoCommandParameter::setInterestLifetime(milliseconds interestLifetime)
{
m_interestLifetime = interestLifetime;
@@ -126,20 +99,6 @@
totalLength += encoder.prependVarNumber(tlv::StartBlockId);
}
- if (m_hasFields[REPO_PARAMETER_MAX_INTEREST_NUM]) {
- variableLength = encoder.prependNonNegativeInteger(m_maxInterestNum);
- totalLength += variableLength;
- totalLength += encoder.prependVarNumber(variableLength);
- totalLength += encoder.prependVarNumber(tlv::MaxInterestNum);
- }
-
- if (m_hasFields[REPO_PARAMETER_WATCH_TIME_OUT]) {
- variableLength = encoder.prependNonNegativeInteger(m_watchTimeout.count());
- totalLength += variableLength;
- totalLength += encoder.prependVarNumber(variableLength);
- totalLength += encoder.prependVarNumber(tlv::WatchTimeout);
- }
-
if (m_hasFields[REPO_PARAMETER_INTEREST_LIFETIME]) {
variableLength = encoder.prependNonNegativeInteger(m_interestLifetime.count());
totalLength += variableLength;
@@ -147,10 +106,6 @@
totalLength += encoder.prependVarNumber(tlv::InterestLifetime);
}
- if (!getSelectors().empty()) {
- totalLength += getSelectors().wireEncode(encoder);
- }
-
if (m_hasFields[REPO_PARAMETER_NAME]) {
totalLength += getName().wireEncode(encoder);
}
@@ -196,15 +151,6 @@
m_name.wireDecode(m_wire.get(tlv::Name));
}
- // Selectors
- val = m_wire.find(tlv::Selectors);
- if (val != m_wire.elements_end())
- {
- m_selectors.wireDecode(*val);
- }
- else
- m_selectors = Selectors();
-
// StartBlockId
val = m_wire.find(tlv::StartBlockId);
if (val != m_wire.elements_end())
@@ -229,22 +175,6 @@
m_processId = readNonNegativeInteger(*val);
}
- // MaxInterestNum
- val = m_wire.find(tlv::MaxInterestNum);
- if (val != m_wire.elements_end())
- {
- m_hasFields[REPO_PARAMETER_MAX_INTEREST_NUM] = true;
- m_maxInterestNum = readNonNegativeInteger(*val);
- }
-
- // WatchTimeout
- val = m_wire.find(tlv::WatchTimeout);
- if (val != m_wire.elements_end())
- {
- m_hasFields[REPO_PARAMETER_WATCH_TIME_OUT] = true;
- m_watchTimeout = milliseconds(readNonNegativeInteger(*val));
- }
-
// InterestLifeTime
val = m_wire.find(tlv::InterestLifetime);
if (val != m_wire.elements_end())
@@ -275,14 +205,6 @@
if (repoCommandParameter.hasProcessId()) {
os << " ProcessId: " << repoCommandParameter.getProcessId();
}
- // MaxInterestNum
- if (repoCommandParameter.hasMaxInterestNum()) {
- os << " MaxInterestNum: " << repoCommandParameter.getMaxInterestNum();
- }
- // WatchTimeout
- if (repoCommandParameter.hasProcessId()) {
- os << " WatchTimeout: " << repoCommandParameter.getWatchTimeout();
- }
// InterestLifetime
if (repoCommandParameter.hasProcessId()) {
os << " InterestLifetime: " << repoCommandParameter.getInterestLifetime();
diff --git a/src/repo-command-parameter.hpp b/src/repo-command-parameter.hpp
index 1ad212f..6ce45f0 100644
--- a/src/repo-command-parameter.hpp
+++ b/src/repo-command-parameter.hpp
@@ -24,16 +24,14 @@
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/encoding/block-helpers.hpp>
-#include <ndn-cxx/name.hpp>
-#include <ndn-cxx/selectors.hpp>
#include <ndn-cxx/mgmt/control-parameters.hpp>
+#include <ndn-cxx/name.hpp>
namespace repo {
using ndn::Name;
using ndn::Block;
using ndn::EncodingImpl;
-using ndn::Selectors;
using ndn::EncodingEstimator;
using ndn::EncodingBuffer;
using namespace ndn::time;
@@ -43,8 +41,6 @@
REPO_PARAMETER_START_BLOCK_ID,
REPO_PARAMETER_END_BLOCK_ID,
REPO_PARAMETER_PROCESS_ID,
- REPO_PARAMETER_MAX_INTEREST_NUM,
- REPO_PARAMETER_WATCH_TIME_OUT,
REPO_PARAMETER_INTEREST_LIFETIME,
REPO_PARAMETER_UBOUND
};
@@ -54,8 +50,6 @@
"StartBlockId",
"EndBlockId",
"ProcessId",
- "MaxInterestNum",
- "WatchTimeout",
"InterestLifetime"
};
@@ -104,20 +98,6 @@
return m_hasFields[REPO_PARAMETER_NAME];
}
- const Selectors&
- getSelectors() const
- {
- return m_selectors;
- }
-
- RepoCommandParameter&
- setSelectors(const Selectors& selectors);
-
- bool
- hasSelectors() const
- {
- return !m_selectors.empty();
- }
uint64_t
getStartBlockId() const
@@ -167,38 +147,6 @@
return m_hasFields[REPO_PARAMETER_PROCESS_ID];
}
- uint64_t
- getMaxInterestNum() const
- {
- assert(hasMaxInterestNum());
- return m_maxInterestNum;
- }
-
- RepoCommandParameter&
- setMaxInterestNum(uint64_t maxInterestNum);
-
- bool
- hasMaxInterestNum() const
- {
- return m_hasFields[REPO_PARAMETER_MAX_INTEREST_NUM];
- }
-
- milliseconds
- getWatchTimeout() const
- {
- assert(hasWatchTimeout());
- return m_watchTimeout;
- }
-
- RepoCommandParameter&
- setWatchTimeout(milliseconds watchTimeout);
-
- bool
- hasWatchTimeout() const
- {
- return m_hasFields[REPO_PARAMETER_WATCH_TIME_OUT];
- }
-
milliseconds
getInterestLifetime() const
{
@@ -233,12 +181,9 @@
private:
std::vector<bool> m_hasFields;
Name m_name;
- Selectors m_selectors;
uint64_t m_startBlockId;
uint64_t m_endBlockId;
uint64_t m_processId;
- uint64_t m_maxInterestNum;
- milliseconds m_watchTimeout;
milliseconds m_interestLifetime;
mutable Block m_wire;
diff --git a/src/repo-command-response.hpp b/src/repo-command-response.hpp
index a2432a7..4f30ca5 100644
--- a/src/repo-command-response.hpp
+++ b/src/repo-command-response.hpp
@@ -150,7 +150,6 @@
wireDecode(const Block& wire);
private:
- //uint64_t m_statusCode;
uint64_t m_startBlockId;
uint64_t m_endBlockId;
uint64_t m_processId;
diff --git a/src/repo-command.cpp b/src/repo-command.cpp
index b8200d5..0c1cc21 100644
--- a/src/repo-command.cpp
+++ b/src/repo-command.cpp
@@ -67,27 +67,6 @@
.required(REPO_PARAMETER_PROCESS_ID);
}
-WatchStartCommand::WatchStartCommand()
-{
- m_requestValidator
- .required(REPO_PARAMETER_NAME)
- .optional(REPO_PARAMETER_INTEREST_LIFETIME)
- .optional(REPO_PARAMETER_MAX_INTEREST_NUM)
- .optional(REPO_PARAMETER_WATCH_TIME_OUT);
-}
-
-WatchCheckCommand::WatchCheckCommand()
-{
- m_requestValidator
- .required(REPO_PARAMETER_NAME);
-}
-
-WatchStopCommand::WatchStopCommand()
-{
- m_requestValidator
- .required(REPO_PARAMETER_NAME);
-}
-
DeleteCommand::DeleteCommand()
{
m_requestValidator
@@ -96,16 +75,6 @@
.required(REPO_PARAMETER_END_BLOCK_ID)
.required(REPO_PARAMETER_PROCESS_ID);
}
-void
-InsertCommand::check(const RepoCommandParameter& parameters) const
-{
- if (parameters.hasStartBlockId() || parameters.hasEndBlockId()) {
- if (parameters.hasSelectors()) {
- BOOST_THROW_EXCEPTION(ArgumentError("BlockId present. BlockId is not supported in this protocol"));
- return;
- }
- }
-}
void
DeleteCommand::check(const RepoCommandParameter& parameters) const
diff --git a/src/repo-command.hpp b/src/repo-command.hpp
index 9b1dbe6..28c2999 100644
--- a/src/repo-command.hpp
+++ b/src/repo-command.hpp
@@ -96,10 +96,6 @@
{
public:
InsertCommand();
-
-private:
- void
- check(const RepoCommandParameter& parameters) const override;
};
class InsertCheckCommand : public RepoCommand
@@ -108,24 +104,6 @@
InsertCheckCommand();
};
-class WatchStartCommand : public RepoCommand
-{
-public:
- WatchStartCommand();
-};
-
-class WatchCheckCommand : public RepoCommand
-{
-public:
- WatchCheckCommand();
-};
-
-class WatchStopCommand : public RepoCommand
-{
-public:
- WatchStopCommand();
-};
-
class DeleteCommand : public RepoCommand
{
public:
diff --git a/src/repo-tlv.hpp b/src/repo-tlv.hpp
index 45a6a0f..b8b9f15 100644
--- a/src/repo-tlv.hpp
+++ b/src/repo-tlv.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -36,11 +36,9 @@
StatusCode = 208,
InsertNum = 209,
DeleteNum = 210,
- MaxInterestNum = 211,
- WatchTimeout = 212
};
-} // tlv
-} // repo
+} // namespace tlv
+} // namespace repo
#endif // REPO_REPO_TLV_HPP
diff --git a/src/repo.cpp b/src/repo.cpp
index 955db96..9deef35 100644
--- a/src/repo.cpp
+++ b/src/repo.cpp
@@ -20,18 +20,22 @@
#include "repo.hpp"
#include "storage/sqlite-storage.hpp"
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
+NDN_LOG_INIT(repo);
+
RepoConfig
parseConfig(const std::string& configPath)
{
if (configPath.empty()) {
- std::cerr << "configuration file path is empty" << std::endl;
+ NDN_LOG_DEBUG("configuration file path is empty");
}
std::ifstream fin(configPath.c_str());
if (!fin.is_open())
- BOOST_THROW_EXCEPTION(Repo::Error("failed to open configuration file '"+ configPath +"'"));
+ BOOST_THROW_EXCEPTION(Repo::Error("failed to open configuration file '" + configPath + "'"));
using namespace boost::property_tree;
ptree propertyTree;
@@ -39,7 +43,7 @@
read_info(fin, propertyTree);
}
catch (const ptree_error& e) {
- BOOST_THROW_EXCEPTION(Repo::Error("failed to read configuration file '"+ configPath +"'"));
+ BOOST_THROW_EXCEPTION(Repo::Error("failed to read configuration file '" + configPath + "'"));
}
ptree repoConf = propertyTree.get_child("repo");
@@ -74,11 +78,6 @@
if (tcpBulkInsert) {
for (const auto& section : *tcpBulkInsert) {
isTcpBulkEnabled = true;
-
- // tcp_bulk_insert {
- // host "localhost" ; IP address or hostname to listen on
- // port 7635 ; Port number to listen on
- // }
if (section.first == "host") {
host = section.second.get_value<std::string>();
}
@@ -113,11 +112,10 @@
, m_face(ioService)
, m_dispatcher(m_face, m_keyChain)
, m_store(std::make_shared<SqliteStorage>(config.dbPath))
- , m_storageHandle(config.nMaxPackets, *m_store)
+ , m_storageHandle(*m_store)
, m_validator(m_face)
, m_readHandle(m_face, m_storageHandle, m_config.registrationSubset)
, m_writeHandle(m_face, m_storageHandle, m_dispatcher, m_scheduler, m_validator)
- , m_watchHandle(m_face, m_storageHandle, m_dispatcher, m_scheduler, m_validator)
, m_deleteHandle(m_face, m_storageHandle, m_dispatcher, m_scheduler, m_validator)
, m_tcpBulkInsertHandle(ioService, m_storageHandle)
{
@@ -129,10 +127,9 @@
{
// Rebuild storage if storage checkpoin exists
ndn::time::steady_clock::TimePoint start = ndn::time::steady_clock::now();
- m_storageHandle.initialize();
ndn::time::steady_clock::TimePoint end = ndn::time::steady_clock::now();
ndn::time::milliseconds cost = ndn::time::duration_cast<ndn::time::milliseconds>(end - start);
- std::cerr << "initialize storage cost: " << cost << "ms" << std::endl;
+ NDN_LOG_DEBUG("initialize storage cost: " << cost << "ms");
}
void
@@ -145,7 +142,7 @@
for (const ndn::Name& cmdPrefix : m_config.repoPrefixes) {
m_face.registerPrefix(cmdPrefix, nullptr,
[] (const Name& cmdPrefix, const std::string& reason) {
- std::cerr << "Command prefix " << cmdPrefix << " registration error: " << reason << std::endl;
+ NDN_LOG_DEBUG("Command prefix " << cmdPrefix << " registration error: " << reason);
BOOST_THROW_EXCEPTION(Error("Command prefix registration failed"));
});
diff --git a/src/repo.hpp b/src/repo.hpp
index e417960..21f314d 100644
--- a/src/repo.hpp
+++ b/src/repo.hpp
@@ -26,7 +26,6 @@
#include "handles/delete-handle.hpp"
#include "handles/read-handle.hpp"
#include "handles/tcp-bulk-insert-handle.hpp"
-#include "handles/watch-handle.hpp"
#include "handles/write-handle.hpp"
#include "common.hpp"
@@ -44,12 +43,11 @@
static const size_t DISABLED_SUBSET_LENGTH = -1;
std::string repoConfigPath;
- //StorageMethod storageMethod; This will be implemtented if there is other method.
std::string dbPath;
std::vector<ndn::Name> dataPrefixes;
size_t registrationSubset = DISABLED_SUBSET_LENGTH;
std::vector<ndn::Name> repoPrefixes;
- std::vector<std::pair<std::string, std::string> > tcpBulkInsertEndpoints;
+ std::vector<std::pair<std::string, std::string>> tcpBulkInsertEndpoints;
uint64_t nMaxPackets;
boost::property_tree::ptree validatorNode;
};
@@ -95,7 +93,6 @@
ReadHandle m_readHandle;
WriteHandle m_writeHandle;
- WatchHandle m_watchHandle;
DeleteHandle m_deleteHandle;
TcpBulkInsertHandle m_tcpBulkInsertHandle;
};
diff --git a/src/storage/index.cpp b/src/storage/index.cpp
deleted file mode 100644
index 3eb2cce..0000000
--- a/src/storage/index.cpp
+++ /dev/null
@@ -1,257 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2014-2017, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "index.hpp"
-
-#include <ndn-cxx/util/sha256.hpp>
-#include <ndn-cxx/security/signature-sha256-with-rsa.hpp>
-
-namespace repo {
-
-/** @brief determines if entry can satisfy interest
- * @param hash SHA256 hash of PublisherPublicKeyLocator if exists in interest, otherwise ignored
- */
-static bool
-matchesSimpleSelectors(const Interest& interest, ndn::ConstBufferPtr& hash,
- const Index::Entry& entry)
-{
- const Name& fullName = entry.getName();
-
- if (!interest.getName().isPrefixOf(fullName))
- return false;
-
- size_t nSuffixComponents = fullName.size() - interest.getName().size();
- if (interest.getMinSuffixComponents() >= 0 &&
- nSuffixComponents < static_cast<size_t>(interest.getMinSuffixComponents()))
- return false;
- if (interest.getMaxSuffixComponents() >= 0 &&
- nSuffixComponents > static_cast<size_t>(interest.getMaxSuffixComponents()))
- return false;
-
- if (!interest.getExclude().empty() &&
- entry.getName().size() > interest.getName().size() &&
- interest.getExclude().isExcluded(entry.getName()[interest.getName().size()]))
- return false;
- if (!interest.getPublisherPublicKeyLocator().empty())
- {
- if (*entry.getKeyLocatorHash() != *hash)
- return false;
- }
- return true;
-}
-
-Index::Index(size_t nMaxPackets)
- : m_maxPackets(nMaxPackets)
- , m_size(0)
-{
-}
-
-
-bool
-Index::insert(const Data& data, int64_t id)
-{
- if (isFull())
- BOOST_THROW_EXCEPTION(Error("The Index is Full. Cannot Insert Any Data!"));
- Entry entry(data, id);
- bool isInserted = m_indexContainer.insert(entry).second;
- if (isInserted)
- ++m_size;
- return isInserted;
-}
-
-bool
-Index::insert(const Name& fullName, int64_t id,
- const ndn::ConstBufferPtr& keyLocatorHash)
-{
- if (isFull())
- BOOST_THROW_EXCEPTION(Error("The Index is Full. Cannot Insert Any Data!"));
- Entry entry(fullName, keyLocatorHash, id);
- bool isInserted = m_indexContainer.insert(entry).second;
- if (isInserted)
- ++m_size;
- return isInserted;
-}
-
-std::pair<int64_t,Name>
-Index::find(const Interest& interest) const
-{
- Name name = interest.getName();
- IndexContainer::const_iterator result = m_indexContainer.lower_bound(name);
- if (result != m_indexContainer.end())
- {
- return selectChild(interest, result);
- }
- else
- {
- return std::make_pair(0, Name());
- }
-}
-
-std::pair<int64_t,Name>
-Index::find(const Name& name) const
-{
- IndexContainer::const_iterator result = m_indexContainer.lower_bound(name);
- if (result != m_indexContainer.end())
- {
- return findFirstEntry(name, result);
- }
- else
- {
- return std::make_pair(0, Name());
- }
-}
-
-bool
-Index::hasData(const Data& data) const
-{
- Index::Entry entry(data, -1); // the id number is useless
- IndexContainer::const_iterator result = m_indexContainer.find(entry);
- return result != m_indexContainer.end();
-
-}
-
-std::pair<int64_t,Name>
-Index::findFirstEntry(const Name& prefix,
- IndexContainer::const_iterator startingPoint) const
-{
- BOOST_ASSERT(startingPoint != m_indexContainer.end());
- if (prefix.isPrefixOf(startingPoint->getName()))
- {
- return std::make_pair(startingPoint->getId(), startingPoint->getName());
- }
- else
- {
- return std::make_pair(0, Name());
- }
-}
-
-bool
-Index::erase(const Name& fullName)
-{
- Entry entry(fullName);
- IndexContainer::const_iterator findIterator = m_indexContainer.find(entry);
- if (findIterator != m_indexContainer.end())
- {
- m_indexContainer.erase(findIterator);
- m_size--;
- return true;
- }
- else
- return false;
-}
-
-const ndn::ConstBufferPtr
-Index::computeKeyLocatorHash(const KeyLocator& keyLocator)
-{
- const Block& block = keyLocator.wireEncode();
- ndn::ConstBufferPtr keyLocatorHash = ndn::util::Sha256::computeDigest(block.wire(), block.size());
- return keyLocatorHash;
-}
-
-std::pair<int64_t,Name>
-Index::selectChild(const Interest& interest,
- IndexContainer::const_iterator startingPoint) const
-{
- BOOST_ASSERT(startingPoint != m_indexContainer.end());
- bool isLeftmost = (interest.getChildSelector() <= 0);
- ndn::ConstBufferPtr hash;
- if (!interest.getPublisherPublicKeyLocator().empty())
- {
- KeyLocator keyLocator = interest.getPublisherPublicKeyLocator();
- const Block& block = keyLocator.wireEncode();
- hash = ndn::util::Sha256::computeDigest(block.wire(), block.size());
- }
-
- if (isLeftmost)
- {
- for (IndexContainer::const_iterator it = startingPoint;
- it != m_indexContainer.end(); ++it)
- {
- if (!interest.getName().isPrefixOf(it->getName()))
- return std::make_pair(0, Name());
- if (matchesSimpleSelectors(interest, hash, (*it)))
- return std::make_pair(it->getId(), it->getName());
- }
- }
- else
- {
- IndexContainer::const_iterator boundary = m_indexContainer.lower_bound(interest.getName());
- if (boundary == m_indexContainer.end() || !interest.getName().isPrefixOf(boundary->getName()))
- return std::make_pair(0, Name());
- Name successor = interest.getName().getSuccessor();
- IndexContainer::const_iterator last = interest.getName().size() == 0 ?
- m_indexContainer.end() : m_indexContainer.lower_bound(interest.getName().getSuccessor());
- while (true)
- {
- IndexContainer::const_iterator prev = last;
- --prev;
- if (prev == boundary)
- {
- bool isMatch = matchesSimpleSelectors(interest, hash, (*prev));
- if (isMatch)
- {
- return std::make_pair(prev->getId(), prev->getName());
- }
- else
- return std::make_pair(0, Name());
- }
- IndexContainer::const_iterator first =
- m_indexContainer.lower_bound(prev->getName().getPrefix(interest.getName().size() + 1));
- IndexContainer::const_iterator match =
- std::find_if(first, last, bind(&matchesSimpleSelectors, interest, hash, _1));
- if (match != last)
- {
- return std::make_pair(match->getId(), match->getName());
- }
- last = first;
- }
- }
- return std::make_pair(0, Name());
-}
-
-Index::Entry::Entry(const Data& data, int64_t id)
- : m_name(data.getFullName())
- , m_id(id)
-{
- const ndn::Signature& signature = data.getSignature();
- if (signature.hasKeyLocator())
- m_keyLocatorHash = computeKeyLocatorHash(signature.getKeyLocator());
-}
-
-Index::Entry::Entry(const Name& fullName, const KeyLocator& keyLocator, int64_t id)
- : m_name(fullName)
- , m_keyLocatorHash(computeKeyLocatorHash(keyLocator))
- , m_id(id)
-{
-}
-
-Index::Entry::Entry(const Name& fullName,
- const ndn::ConstBufferPtr& keyLocatorHash, int64_t id)
- : m_name(fullName)
- , m_keyLocatorHash(keyLocatorHash)
- , m_id(id)
-{
-}
-
-Index::Entry::Entry(const Name& name)
- : m_name(name)
-{
-}
-
-} // namespace repo
diff --git a/src/storage/index.hpp b/src/storage/index.hpp
deleted file mode 100644
index d60bd37..0000000
--- a/src/storage/index.hpp
+++ /dev/null
@@ -1,243 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2014-2018, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef REPO_STORAGE_INDEX_HPP
-#define REPO_STORAGE_INDEX_HPP
-
-#include "../common.hpp"
-
-#include <set>
-
-namespace repo {
-
-class Index : noncopyable
-{
-public:
- class Error : public std::runtime_error
- {
- public:
- explicit
- Error(const std::string& what)
- : std::runtime_error(what)
- {
- }
- };
-
- class Entry
- {
- public:
- class Error : public std::runtime_error
- {
- public:
- explicit
- Error(const std::string& what)
- : std::runtime_error(what)
- {
- }
- };
-
- public:
- /**
- * @brief used by set to construct node
- */
- Entry() = default;
-
- /**
- * @brief construct Entry by data and id number
- */
- Entry(const Data& data, int64_t id);
-
- /**
- * @brief construct Entry by fullName, keyLocator and id number
- */
- Entry(const Name& fullName, const KeyLocator& keyLocator, int64_t id);
-
- /**
- * @brief construct Entry by fullName, keyLocatorHash and id number
- * @param fullName full name with digest computed from data
- * @param keyLocatorHash keyLocator hashed by sha256
- * @param id record ID from database
- */
- Entry(const Name& fullName, const ndn::ConstBufferPtr& keyLocatorHash, int64_t id);
-
- /**
- * @brief implicit construct Entry by full name
- *
- * Allow implicit conversion from Name for set lookups by Name
- */
- Entry(const Name& name);
-
- /**
- * @brief get the name of entry
- */
- const Name&
- getName() const
- {
- return m_name;
- }
-
- /**
- * @brief get the keyLocator hash value of the entry
- */
- const ndn::ConstBufferPtr&
- getKeyLocatorHash() const
- {
- return m_keyLocatorHash;
- }
-
- /**
- * @brief get record ID from database
- */
- int64_t
- getId() const
- {
- return m_id;
- }
-
- bool
- operator>(const Entry& entry) const
- {
- return m_name > entry.getName();
- }
-
- bool
- operator<(const Entry& entry) const
- {
- return m_name < entry.getName();
- }
-
- bool
- operator==(const Entry& entry) const
- {
- return m_name == entry.getName();
- }
-
- bool
- operator!=(const Entry& entry) const
- {
- return m_name != entry.getName();
- }
-
- private:
- Name m_name;
- ndn::ConstBufferPtr m_keyLocatorHash;
- int64_t m_id;
- };
-
-private:
- typedef std::set<Entry> IndexContainer;
-
-public:
- explicit
- Index(size_t nMaxPackets);
-
- /**
- * @brief insert entries into index
- * @param data used to construct entries
- * @param id obtained from database
- */
- bool
- insert(const Data& data, int64_t id);
-
- /**
- * @brief insert entries into index
- * @param data used to construct entries
- * @param id obtained from database
- */
- bool
- insert(const Name& fullName, int64_t id,
- const ndn::ConstBufferPtr& keyLocatorHash);
-
- /**
- * @brief erase the entry in index by its fullname
- */
- bool
- erase(const Name& fullName);
-
- /** @brief find the Entry for best match of an Interest
- * @return ID and fullName of the Entry, or (0,ignored) if not found
- */
- std::pair<int64_t, Name>
- find(const Interest& interest) const;
-
- /** @brief find the first Entry under a Name prefix
- * @return ID and fullName of the Entry, or (0,ignored) if not found
- */
- std::pair<int64_t, Name>
- find(const Name& name) const;
-
- /**
- * @brief determine whether same Data is already in the index
- * @return true if identical Data exists, false otherwise
- */
- bool
- hasData(const Data& data) const;
-
- /**
- * @brief compute the hash value of keyLocator
- */
- static const ndn::ConstBufferPtr
- computeKeyLocatorHash(const KeyLocator& keyLocator);
-
- size_t
- size() const
- {
- return m_size;
- }
-
-private:
- /**
- * @brief select entries which satisfy the selectors in interest and return their name
- * @param interest used to select entries by comparing the name and checking selectors
- * @param idName save the id and name of found entries
- * @param startingPoint the entry whose name is equal or larger than the interest name
- */
- std::pair<int64_t, Name>
- selectChild(const Interest& interest,
- IndexContainer::const_iterator startingPoint) const;
-
- /**
- * @brief check whether the index is full
- */
- bool
- isFull() const
- {
- return m_size >= m_maxPackets;
- }
-
- /**
- * @brief find the first entry with the prefix
- * @param prefix used to request the entries
- * @param startingPoint the entry whose name is equal or larger than the interest name
- * @return int64_t the id number of found entry
- * @return Name the name of found entry
- */
- std::pair<int64_t, Name>
- findFirstEntry(const Name& prefix,
- IndexContainer::const_iterator startingPoint) const;
-
-private:
- IndexContainer m_indexContainer;
- size_t m_maxPackets;
- size_t m_size;
-};
-
-} // namespace repo
-
-#endif // REPO_STORAGE_INDEX_HPP
diff --git a/src/storage/repo-storage.cpp b/src/storage/repo-storage.cpp
index 148116b..334b4d3 100644
--- a/src/storage/repo-storage.cpp
+++ b/src/storage/repo-storage.cpp
@@ -28,62 +28,52 @@
NDN_LOG_INIT(repo.RepoStorage);
-RepoStorage::RepoStorage(const int64_t& nMaxPackets, Storage& store)
- : m_index(nMaxPackets)
- , m_storage(store)
+RepoStorage::RepoStorage(Storage& store)
+ : m_storage(store)
{
}
-void
-RepoStorage::initialize()
-{
- NDN_LOG_DEBUG("Initialize");
- m_storage.fullEnumerate(bind(&RepoStorage::insertItemToIndex, this, _1));
-}
-
-void
-RepoStorage::insertItemToIndex(const Storage::ItemMeta& item)
-{
- NDN_LOG_DEBUG("Insert data to index " << item.fullName);
- m_index.insert(item.fullName, item.id, item.keyLocatorHash);
- afterDataInsertion(item.fullName);
-}
-
bool
RepoStorage::insertData(const Data& data)
{
- bool isExist = m_index.hasData(data);
- if (isExist)
- BOOST_THROW_EXCEPTION(Error("The Entry Has Already In the Skiplist. Cannot be Inserted!"));
- int64_t id = m_storage.insert(data);
- if (id == -1)
- return false;
- bool didInsert = m_index.insert(data, id);
- if (didInsert)
- afterDataInsertion(data.getName());
- return didInsert;
+ bool isExist = m_storage.has(data.getFullName());
+
+ if (isExist) {
+ NDN_LOG_DEBUG("Data already in storage, regarded as successful data insertion");
+ return true;
+ }
+
+ int64_t id = m_storage.insert(data);
+ NDN_LOG_DEBUG("Insert ID: " << id << ", full name:" << data.getFullName());
+ if (id == NOTFOUND)
+ return false;
+
+ afterDataInsertion(data.getName());
+ return true;
}
ssize_t
RepoStorage::deleteData(const Name& name)
{
+ NDN_LOG_DEBUG("Delete: " << name);
bool hasError = false;
- std::pair<int64_t,ndn::Name> idName = m_index.find(name);
- if (idName.first == 0)
- return false;
+
int64_t count = 0;
- while (idName.first != 0) {
- bool resultDb = m_storage.erase(idName.first);
- bool resultIndex = m_index.erase(idName.second); //full name
- if (resultDb && resultIndex) {
- afterDataDeletion(idName.second);
+ std::shared_ptr<Data> foundData;
+ Name foundName;
+ while ((foundData = m_storage.find(name))) {
+ foundName = foundData->getFullName();
+ bool resultDb = m_storage.erase(foundName);
+ if (resultDb) {
+ afterDataDeletion(foundName);
count++;
}
else {
hasError = true;
}
- idName = m_index.find(name);
+ NDN_LOG_DEBUG("Delete: " << name << ", found " << foundName << ", count " << count << ", result " << resultDb);
}
+
if (hasError)
return -1;
else
@@ -93,40 +83,15 @@
ssize_t
RepoStorage::deleteData(const Interest& interest)
{
- Interest interestDelete = interest;
- interestDelete.setChildSelector(0); //to disable the child selector in delete handle
- int64_t count = 0;
- bool hasError = false;
- std::pair<int64_t,ndn::Name> idName = m_index.find(interestDelete);
- while (idName.first != 0) {
- bool resultDb = m_storage.erase(idName.first);
- bool resultIndex = m_index.erase(idName.second); //full name
- if (resultDb && resultIndex) {
- afterDataDeletion(idName.second);
- count++;
- }
- else {
- hasError = true;
- }
- idName = m_index.find(interestDelete);
- }
- if (hasError)
- return -1;
- else
- return count;
+ return deleteData(interest.getName());
}
-shared_ptr<Data>
+std::shared_ptr<Data>
RepoStorage::readData(const Interest& interest) const
{
- std::pair<int64_t,ndn::Name> idName = m_index.find(interest);
- if (idName.first != 0) {
- shared_ptr<Data> data = m_storage.read(idName.first);
- if (data) {
- return data;
- }
- }
- return shared_ptr<Data>();
+ NDN_LOG_DEBUG("Reading data for " << interest.getName());
+
+ return m_storage.read(interest.getName());
}
diff --git a/src/storage/repo-storage.hpp b/src/storage/repo-storage.hpp
index ec5faa7..39e576d 100644
--- a/src/storage/repo-storage.hpp
+++ b/src/storage/repo-storage.hpp
@@ -22,10 +22,8 @@
#include "../common.hpp"
#include "storage.hpp"
-#include "index.hpp"
#include "../repo-command-parameter.hpp"
-#include <ndn-cxx/exclude.hpp>
#include <ndn-cxx/util/signal.hpp>
#include <queue>
@@ -50,13 +48,7 @@
};
public:
- RepoStorage(const int64_t& nMaxPackets, Storage& store);
-
- /**
- * @brief rebuild index from database
- */
- void
- initialize();
+ RepoStorage(Storage& store);
/**
* @brief insert data into repo
@@ -66,7 +58,7 @@
/**
* @brief delete data from repo
- * @param name used to find entry needed to be erased in repo
+ * @param name from interest, use it as a prefix to find entry needed to be erased in repo
* @return if deletion in either index or database fail, return -1,
* otherwise return the number of erased entries
*/
@@ -83,24 +75,20 @@
deleteData(const Interest& interest);
/**
- * @brief read data from repo
+ * @brief read data from repo
* @param interest used to request data
* @return std::shared_ptr<Data>
*/
std::shared_ptr<Data>
readData(const Interest& interest) const;
-private:
- void
- insertItemToIndex(const Storage::ItemMeta& item);
-
public:
ndn::util::Signal<RepoStorage, ndn::Name> afterDataInsertion;
ndn::util::Signal<RepoStorage, ndn::Name> afterDataDeletion;
private:
- Index m_index;
Storage& m_storage;
+ const int NOTFOUND = -1;
};
} // namespace repo
diff --git a/src/storage/sqlite-storage.cpp b/src/storage/sqlite-storage.cpp
index 42739c3..91016ea 100644
--- a/src/storage/sqlite-storage.cpp
+++ b/src/storage/sqlite-storage.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -19,23 +19,25 @@
#include "sqlite-storage.hpp"
#include "config.hpp"
-#include "index.hpp"
#include <ndn-cxx/util/sha256.hpp>
+#include <ndn-cxx/util/sqlite3-statement.hpp>
+
#include <boost/filesystem.hpp>
#include <istream>
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
-using std::string;
+NDN_LOG_INIT(repo.SqliteStorage);
-SqliteStorage::SqliteStorage(const string& dbPath)
- : m_size(0)
+SqliteStorage::SqliteStorage(const std::string& dbPath)
{
if (dbPath.empty()) {
- std::cerr << "Create db file in local location [" << dbPath << "]. " << std::endl
- << "You can assign the path using -d option" << std::endl;
- m_dbPath = string("ndn_repo.db");
+ m_dbPath = std::string("ndn_repo.db");
+ NDN_LOG_DEBUG("Create db file in local location [" << m_dbPath << "]. " );
+ NDN_LOG_DEBUG("You can assign the path using -d option" );
}
else {
boost::filesystem::path fsPath(dbPath);
@@ -55,32 +57,31 @@
void
SqliteStorage::initializeRepo()
{
- char* errMsg = 0;
-
+ char* errMsg = nullptr;
int rc = sqlite3_open_v2(m_dbPath.c_str(), &m_db,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
#ifdef DISABLE_SQLITE3_FS_LOCKING
"unix-dotfile"
#else
- 0
+ nullptr
#endif
- );
+ );
if (rc == SQLITE_OK) {
- sqlite3_exec(m_db, "CREATE TABLE NDN_REPO ("
- "id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, "
- "name BLOB, "
- "data BLOB, "
- "keylocatorHash BLOB);\n "
- , 0, 0, &errMsg);
+ // Create a new table named NDN_REPO_V2, distinguish from the old table name(NDN_REPO)
+ sqlite3_exec(m_db, "CREATE TABLE NDN_REPO_V2 (name BLOB, data BLOB);", nullptr, nullptr, &errMsg);
// Ignore errors (when database already exists, errors are expected)
+ sqlite3_exec(m_db, "CREATE UNIQUE INDEX index_name ON NDN_REPO_V2 (name);", nullptr, nullptr, &errMsg);
}
else {
- std::cerr << "Database file open failure rc:" << rc << std::endl;
+ NDN_LOG_DEBUG("Database file open failure rc:" << rc);
BOOST_THROW_EXCEPTION(Error("Database file open failure"));
}
- sqlite3_exec(m_db, "PRAGMA synchronous = OFF", 0, 0, &errMsg);
- sqlite3_exec(m_db, "PRAGMA journal_mode = WAL", 0, 0, &errMsg);
+
+ // SQLite continues without syncing as soon as it has handed data off to the operating system
+ sqlite3_exec(m_db, "PRAGMA synchronous = OFF;", nullptr, nullptr, &errMsg);
+ // Uses a write-ahead log instead of a rollback journal to implement transactions.
+ sqlite3_exec(m_db, "PRAGMA journal_mode = WAL;", nullptr, nullptr, &errMsg);
}
SqliteStorage::~SqliteStorage()
@@ -88,211 +89,158 @@
sqlite3_close(m_db);
}
-void
-SqliteStorage::fullEnumerate(const std::function<void(const Storage::ItemMeta)>& f)
-{
- sqlite3_stmt* m_stmt = 0;
- int rc = SQLITE_DONE;
- string sql = string("SELECT id, name, keylocatorHash FROM NDN_REPO;");
- rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &m_stmt, 0);
- if (rc != SQLITE_OK)
- BOOST_THROW_EXCEPTION(Error("Initiation Read Entries from Database Prepare error"));
- int entryNumber = 0;
- while (true) {
- rc = sqlite3_step(m_stmt);
- if (rc == SQLITE_ROW) {
-
- ItemMeta item;
- item.fullName.wireDecode(Block(reinterpret_cast<const uint8_t*>(sqlite3_column_blob(m_stmt, 1)),
- sqlite3_column_bytes(m_stmt, 1)));
- item.id = sqlite3_column_int(m_stmt, 0);
- item.keyLocatorHash = make_shared<const ndn::Buffer>(sqlite3_column_blob(m_stmt, 3),
- sqlite3_column_bytes(m_stmt, 3));
-
- try {
- f(item);
- }
- catch (...) {
- sqlite3_finalize(m_stmt);
- throw;
- }
- entryNumber++;
- }
- else if (rc == SQLITE_DONE) {
- sqlite3_finalize(m_stmt);
- break;
- }
- else {
- std::cerr << "Initiation Read Entries rc:" << rc << std::endl;
- sqlite3_finalize(m_stmt);
- BOOST_THROW_EXCEPTION(Error("Initiation Read Entries error"));
- }
- }
- m_size = entryNumber;
-}
-
int64_t
SqliteStorage::insert(const Data& data)
{
- Name name = data.getName();
+ Name name = data.getFullName(); // store the full name
+ ndn::util::Sqlite3Statement stmt(m_db, "INSERT INTO NDN_REPO_V2 (name, data) VALUES (?, ?);");
- Index::Entry entry(data, 0); //the id is not used
- int64_t id = -1;
- if (name.empty()) {
- std::cerr << "name is empty" << std::endl;
- return -1;
- }
-
- int rc = 0;
-
- sqlite3_stmt* insertStmt = 0;
-
- string insertSql = string("INSERT INTO NDN_REPO (id, name, data, keylocatorHash) "
- "VALUES (?, ?, ?, ?)");
-
- if (sqlite3_prepare_v2(m_db, insertSql.c_str(), -1, &insertStmt, 0) != SQLITE_OK) {
- sqlite3_finalize(insertStmt);
- std::cerr << "insert sql not prepared" << std::endl;
- }
//Insert
- auto result = sqlite3_bind_null(insertStmt, 1);
+ // Bind NULL to name value in NDN_REPO_V2 when initialize result.
+ auto result = sqlite3_bind_null(stmt, 1);
if (result == SQLITE_OK) {
- result = sqlite3_bind_blob(insertStmt, 2,
- entry.getName().wireEncode().wire(),
- entry.getName().wireEncode().size(), SQLITE_STATIC);
+ result = stmt.bind(1, name.wireEncode().value(),
+ name.wireEncode().value_size(), SQLITE_STATIC);
}
if (result == SQLITE_OK) {
- result = sqlite3_bind_blob(insertStmt, 3,
- data.wireEncode().wire(),
- data.wireEncode().size(), SQLITE_STATIC);
- }
- if (result == SQLITE_OK) {
- BOOST_ASSERT(entry.getKeyLocatorHash()->size() == ndn::util::Sha256::DIGEST_SIZE);
- result = sqlite3_bind_blob(insertStmt, 4,
- entry.getKeyLocatorHash()->data(),
- entry.getKeyLocatorHash()->size(), SQLITE_STATIC);
+ result = stmt.bind(2, data.wireEncode(), SQLITE_STATIC);
}
+ int id = 0;
if (result == SQLITE_OK) {
- rc = sqlite3_step(insertStmt);
+ int rc = 0;
+ rc = stmt.step();
if (rc == SQLITE_CONSTRAINT) {
- std::cerr << "Insert failed" << std::endl;
- sqlite3_finalize(insertStmt);
+ NDN_LOG_DEBUG("Insert failed");
BOOST_THROW_EXCEPTION(Error("Insert failed"));
- }
- sqlite3_reset(insertStmt);
- m_size++;
- id = sqlite3_last_insert_rowid(m_db);
+ }
+ sqlite3_reset(stmt);
+ id = sqlite3_last_insert_rowid(m_db);
}
else {
BOOST_THROW_EXCEPTION(Error("Some error with insert"));
}
-
- sqlite3_finalize(insertStmt);
return id;
}
-
bool
-SqliteStorage::erase(const int64_t id)
+SqliteStorage::erase(const Name& name)
{
- sqlite3_stmt* deleteStmt = 0;
+ ndn::util::Sqlite3Statement stmt(m_db, "DELETE FROM NDN_REPO_V2 WHERE name = ?;");
- string deleteSql = string("DELETE from NDN_REPO where id = ?;");
+ auto result = stmt.bind(1,
+ name.wireEncode().value(),
+ name.wireEncode().value_size(), SQLITE_STATIC);
- if (sqlite3_prepare_v2(m_db, deleteSql.c_str(), -1, &deleteStmt, 0) != SQLITE_OK) {
- sqlite3_finalize(deleteStmt);
- std::cerr << "delete statement prepared failed" << std::endl;
- BOOST_THROW_EXCEPTION(Error("delete statement prepared failed"));
- }
-
- if (sqlite3_bind_int64(deleteStmt, 1, id) == SQLITE_OK) {
- int rc = sqlite3_step(deleteStmt);
+ if (result == SQLITE_OK) {
+ int rc = stmt.step();
if (rc != SQLITE_DONE && rc != SQLITE_ROW) {
- std::cerr << " node delete error rc:" << rc << std::endl;
- sqlite3_finalize(deleteStmt);
- BOOST_THROW_EXCEPTION(Error(" node delete error"));
+ NDN_LOG_DEBUG("Node delete error rc:" << rc);
+ BOOST_THROW_EXCEPTION(Error("Node delete error"));
}
- if (sqlite3_changes(m_db) != 1)
+ if (sqlite3_changes(m_db) != 1) {
return false;
- m_size--;
+ }
}
else {
- std::cerr << "delete bind error" << std::endl;
- sqlite3_finalize(deleteStmt);
+ NDN_LOG_DEBUG("delete bind error");
BOOST_THROW_EXCEPTION(Error("delete bind error"));
}
- sqlite3_finalize(deleteStmt);
return true;
}
-
-shared_ptr<Data>
-SqliteStorage::read(const int64_t id)
+std::shared_ptr<Data>
+SqliteStorage::read(const Name& name)
{
- sqlite3_stmt* queryStmt = 0;
- string sql = string("SELECT * FROM NDN_REPO WHERE id = ? ;");
- int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
- if (rc == SQLITE_OK) {
- if (sqlite3_bind_int64(queryStmt, 1, id) == SQLITE_OK) {
- rc = sqlite3_step(queryStmt);
- if (rc == SQLITE_ROW) {
- auto data = make_shared<Data>();
- data->wireDecode(Block(reinterpret_cast<const uint8_t*>(sqlite3_column_blob(queryStmt, 2)),
- sqlite3_column_bytes(queryStmt, 2)));
- sqlite3_finalize(queryStmt);
- return data;
+ return find(name);
+}
+
+bool
+SqliteStorage::has(const Name& name)
+{
+ // find exact match
+ return find(name, true) != nullptr;
+}
+
+std::shared_ptr<Data>
+SqliteStorage::find(const Name& name, bool exactMatch)
+{
+ NDN_LOG_DEBUG("Trying to find: " << name);
+ Name nameSuccessor;
+ if (!exactMatch) {
+ nameSuccessor = name.getSuccessor();
+ }
+
+ std::string sql;
+ if (exactMatch)
+ sql = "SELECT * FROM NDN_REPO_V2 WHERE name = ?;";
+ else
+ sql = "SELECT * FROM NDN_REPO_V2 WHERE name >= ? and name < ?;";
+
+ ndn::util::Sqlite3Statement stmt(m_db, sql);
+
+ auto result = stmt.bind(1,
+ name.wireEncode().value(),
+ name.wireEncode().value_size(), SQLITE_STATIC);
+
+ // use getsuccessor to locate prefix match items
+ if (result == SQLITE_OK && !exactMatch) {
+ // use V in TLV for prefix match when there is no exact match
+ result = stmt.bind(2,
+ nameSuccessor.wireEncode().value(),
+ nameSuccessor.wireEncode().value_size(), SQLITE_STATIC);
+ }
+
+ if (result == SQLITE_OK) {
+ int rc = stmt.step();
+ if (rc == SQLITE_ROW) {
+ Name foundName;
+
+ auto data = std::make_shared<Data>();
+ try {
+ data->wireDecode(stmt.getBlock(1));
}
- else if (rc == SQLITE_DONE) {
+ catch (const ndn::Block::Error& error) {
+ NDN_LOG_DEBUG(error.what());
return nullptr;
}
- else {
- std::cerr << "Database query failure rc:" << rc << std::endl;
- sqlite3_finalize(queryStmt);
- BOOST_THROW_EXCEPTION(Error("Database query failure"));
+ NDN_LOG_DEBUG("Data from db: " << *data);
+
+ foundName = data->getFullName();
+
+ if ((exactMatch && name == foundName) || (!exactMatch && name.isPrefixOf(foundName))) {
+ NDN_LOG_DEBUG("Found: " << foundName << " " << stmt.getInt(0));
+ return data;
}
}
- else {
- std::cerr << "select bind error" << std::endl;
- sqlite3_finalize(queryStmt);
- BOOST_THROW_EXCEPTION(Error("select bind error"));
+ else if (rc == SQLITE_DONE) {
+ return nullptr;
}
- sqlite3_finalize(queryStmt);
+ else {
+ NDN_LOG_DEBUG("Database query failure rc:" << rc);
+ BOOST_THROW_EXCEPTION(Error("Database query failure"));
+ }
}
else {
- sqlite3_finalize(queryStmt);
- std::cerr << "select statement prepared failed" << std::endl;
- BOOST_THROW_EXCEPTION(Error("select statement prepared failed"));
+ NDN_LOG_DEBUG("select bind error");
+ BOOST_THROW_EXCEPTION(Error("select bind error"));
}
return nullptr;
}
-int64_t
+uint64_t
SqliteStorage::size()
{
- sqlite3_stmt* queryStmt = 0;
- string sql("SELECT count(*) FROM NDN_REPO ");
- int rc = sqlite3_prepare_v2(m_db, sql.c_str(), -1, &queryStmt, 0);
- if (rc != SQLITE_OK)
- {
- std::cerr << "Database query failure rc:" << rc << std::endl;
- sqlite3_finalize(queryStmt);
- BOOST_THROW_EXCEPTION(Error("Database query failure"));
- }
+ ndn::util::Sqlite3Statement stmt(m_db, "SELECT count(*) FROM NDN_REPO_V2;");
- rc = sqlite3_step(queryStmt);
- if (rc != SQLITE_ROW)
- {
- std::cerr << "Database query failure rc:" << rc << std::endl;
- sqlite3_finalize(queryStmt);
- BOOST_THROW_EXCEPTION(Error("Database query failure"));
- }
-
- int64_t nDatas = sqlite3_column_int64(queryStmt, 0);
- if (m_size != nDatas) {
- std::cerr << "The size of database is not correct! " << std::endl;
+ int rc = stmt.step();
+ if (rc != SQLITE_ROW) {
+ NDN_LOG_DEBUG("Database query failure rc:" << rc);
+ BOOST_THROW_EXCEPTION(Error("Database query failure"));
}
- return nDatas;
+
+ uint64_t nData = stmt.getInt(0);
+ return nData;
}
} // namespace repo
diff --git a/src/storage/sqlite-storage.hpp b/src/storage/sqlite-storage.hpp
old mode 100755
new mode 100644
index 168cc41..4d7142b
--- a/src/storage/sqlite-storage.hpp
+++ b/src/storage/sqlite-storage.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -21,19 +21,17 @@
#define REPO_STORAGE_SQLITE_STORAGE_HPP
#include "storage.hpp"
-#include "index.hpp"
-#include <string>
-#include <iostream>
-#include <sqlite3.h>
-#include <stdlib.h>
-#include <vector>
-#include <queue>
+
#include <algorithm>
+#include <iostream>
+#include <queue>
+#include <stdlib.h>
+#include <string>
+#include <sqlite3.h>
+#include <vector>
namespace repo {
-using std::queue;
-
class SqliteStorage : public Storage
{
public:
@@ -50,7 +48,6 @@
explicit
SqliteStorage(const std::string& dbPath);
- virtual
~SqliteStorage();
/**
@@ -58,35 +55,30 @@
* @param data the data should be inserted into databse
* @return int64_t the id number of each entry in the database
*/
- virtual int64_t
- insert(const Data& data);
+ int64_t
+ insert(const Data& data) override;
/**
- * @brief remove the entry in the database by using id
- * @param id id number of each entry in the database
+ * @brief remove the entry in the database by using name as index
+ * @param name name of the data
*/
- virtual bool
- erase(const int64_t id);
+ bool
+ erase(const Name& name) override;
- /**
- * @brief get the data from database
- * @para id id number of each entry in the database, used to find the data
- */
- virtual std::shared_ptr<Data>
- read(const int64_t id);
+ std::shared_ptr<Data>
+ read(const Name& name) override;
+
+ bool
+ has(const Name& name) override;
+
+ std::shared_ptr<Data>
+ find(const Name& name, bool exactMatch = false) override;
/**
* @brief return the size of database
*/
- virtual int64_t
- size();
-
- /**
- * @brief enumerate each entry in database and call the function
- * insertItemToIndex to reubuild index from database
- */
- void
- fullEnumerate(const std::function<void(const Storage::ItemMeta)>& f);
+ uint64_t
+ size() override;
private:
void
@@ -95,7 +87,6 @@
private:
sqlite3* m_db;
std::string m_dbPath;
- int64_t m_size;
};
diff --git a/src/storage/storage.hpp b/src/storage/storage.hpp
old mode 100755
new mode 100644
index 5286c8a..146368f
--- a/src/storage/storage.hpp
+++ b/src/storage/storage.hpp
@@ -43,14 +43,6 @@
}
};
- class ItemMeta
- {
- public:
- int64_t id;
- Name fullName;
- ndn::ConstBufferPtr keyLocatorHash;
- };
-
public:
virtual
~Storage() = default;
@@ -63,31 +55,38 @@
insert(const Data& data) = 0;
/**
- * @brief remove the entry in the database by using id
- * @param id id number of entry in the database
+ * @brief remove the entry in the database by full name
+ * @param full name full name of the data
*/
virtual bool
- erase(const int64_t id) = 0;
+ erase(const Name& name) = 0;
/**
* @brief get the data from database
- * @param id id number of each entry in the database, used to find the data
+ * @param full name full name of the data
*/
virtual std::shared_ptr<Data>
- read(const int64_t id) = 0;
+ read(const Name& name) = 0;
+
+ /**
+ * @brief check if database already has the data
+ * @param full name full name of the data
+ */
+ virtual bool
+ has(const Name& name) = 0;
+
+ /**
+ * @brief find the data in database by full name and return it
+ * @param full name full name of the data
+ */
+ virtual std::shared_ptr<Data>
+ find(const Name& name, bool exactMatch = false) = 0;
/**
* @brief return the size of database
*/
- virtual int64_t
+ virtual uint64_t
size() = 0;
-
- /**
- * @brief enumerate each entry in database and call the function
- * insertItemToIndex to reubuild index from database
- */
- virtual void
- fullEnumerate(const std::function<void(const Storage::ItemMeta)>& f) = 0;
};
} // namespace repo