read: auto-register prefixes for inserted data
Change-Id: Iebddca056a4c74f179f7af7203881adfe1cba777
refs: #4247
diff --git a/src/common.hpp b/src/common.hpp
index 6b7c7b9..0ccc485 100644
--- a/src/common.hpp
+++ b/src/common.hpp
@@ -20,6 +20,8 @@
#ifndef REPO_COMMON_HPP
#define REPO_COMMON_HPP
+#include "config.hpp"
+
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/interest.hpp>
#include <ndn-cxx/name.hpp>
@@ -43,6 +45,12 @@
#include <string>
#include <vector>
+#ifdef HAVE_TESTS
+#define PUBLIC_WITH_TESTS_ELSE_PRIVATE public
+#else
+#define PUBLIC_WITH_TESTS_ELSE_PRIVATE private
+#endif
+
namespace repo {
using ndn::Face;
diff --git a/src/handles/base-handle.hpp b/src/handles/base-handle.hpp
index a14360a..5d12076 100644
--- a/src/handles/base-handle.hpp
+++ b/src/handles/base-handle.hpp
@@ -44,8 +44,8 @@
public:
BaseHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
Scheduler& scheduler)
- : m_face(face)
- , m_storageHandle(storageHandle)
+ : m_storageHandle(storageHandle)
+ , m_face(face)
, m_keyChain(keyChain)
, m_scheduler(scheduler)
// , m_storeindex(storeindex)
@@ -101,10 +101,11 @@
void
extractParameter(const Interest& interest, const Name& prefix, RepoCommandParameter& parameter);
-private:
-
- Face& m_face;
+protected:
RepoStorage& m_storageHandle;
+
+private:
+ Face& m_face;
KeyChain& m_keyChain;
Scheduler& m_scheduler;
// RepoStorage& m_storeindex;
diff --git a/src/handles/read-handle.cpp b/src/handles/read-handle.cpp
index 83133e2..1f13bf3 100644
--- a/src/handles/read-handle.cpp
+++ b/src/handles/read-handle.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2014-2017, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -18,15 +18,39 @@
*/
#include "read-handle.hpp"
+#include "repo.hpp"
namespace repo {
+ReadHandle::ReadHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, size_t prefixSubsetLength)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ , m_prefixSubsetLength(prefixSubsetLength)
+{
+}
+
+void
+ReadHandle::connectAutoListen()
+{
+ // Connect a RepoStorage's signals to the read handle
+ if (m_prefixSubsetLength != RepoConfig::DISABLED_SUBSET_LENGTH) {
+ afterDataDeletionConnection = m_storageHandle.afterDataInsertion.connect(
+ [this] (const Name& prefix) {
+ onDataInserted(prefix);
+ });
+ afterDataInsertionConnection = m_storageHandle.afterDataDeletion.connect(
+ [this] (const Name& prefix) {
+ onDataDeleted(prefix);
+ });
+ }
+}
+
void
ReadHandle::onInterest(const Name& prefix, const Interest& interest)
{
shared_ptr<ndn::Data> data = getStorageHandle().readData(interest);
- if (data != NULL) {
+ if (data != nullptr) {
getFace().put(*data);
}
}
@@ -47,4 +71,53 @@
bind(&ReadHandle::onRegisterFailed, this, _1, _2));
}
-} //namespace repo
+void
+ReadHandle::onDataDeleted(const Name& name)
+{
+ // We add one here to account for the implicit digest at the end,
+ // which is what we get from the underlying storage when deleting.
+ Name prefix = name.getPrefix(-(m_prefixSubsetLength + 1));
+ auto check = m_insertedDataPrefixes.find(prefix);
+ if (check != m_insertedDataPrefixes.end()) {
+ if (--(check->second.useCount) <= 0) {
+ getFace().unsetInterestFilter(check->second.prefixId);
+ m_insertedDataPrefixes.erase(prefix);
+ }
+ }
+}
+
+void
+ReadHandle::onDataInserted(const Name& name)
+{
+ // Note: We want to save the prefix that we register exactly, not the
+ // name that provoked the registration
+ Name prefixToRegister = name.getPrefix(-m_prefixSubsetLength);
+ ndn::InterestFilter filter(prefixToRegister);
+ auto check = m_insertedDataPrefixes.find(prefixToRegister);
+ if (check == m_insertedDataPrefixes.end()) {
+ // Because of stack lifetime problems, we assume here that the
+ // prefix registration will be successful, and we add the registered
+ // prefix to our list. This is because, if we fail, we shut
+ // everything down, anyway. If registration failures are ever
+ // considered to be recoverable, we would need to make this
+ // atomic.
+ const ndn::RegisteredPrefixId* prefixId = getFace().setInterestFilter(filter,
+ [this] (const ndn::InterestFilter& filter, const Interest& interest) {
+ // Implicit conversion to Name of filter
+ onInterest(filter, interest);
+ },
+ [this] (const Name& prefix) {
+ },
+ [this] (const Name& prefix, const std::string& reason) {
+ onRegisterFailed(prefix, reason);
+ });
+ RegisteredDataPrefix registeredPrefix{prefixId, 1};
+ // Newly registered prefix
+ m_insertedDataPrefixes.emplace(std::make_pair(prefixToRegister, registeredPrefix));
+ }
+ else {
+ check->second.useCount++;
+ }
+}
+
+} // namespace repo
diff --git a/src/handles/read-handle.hpp b/src/handles/read-handle.hpp
index 19d2034..a2bbd86 100644
--- a/src/handles/read-handle.hpp
+++ b/src/handles/read-handle.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2014-2017, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -20,23 +20,50 @@
#ifndef REPO_HANDLES_READ_HANDLE_HPP
#define REPO_HANDLES_READ_HANDLE_HPP
+#include "common.hpp"
#include "base-handle.hpp"
-
namespace repo {
class ReadHandle : public BaseHandle
{
public:
- ReadHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
+ using DataPrefixRegistrationCallback = std::function<void(const ndn::Name&)>;
+ using DataPrefixUnregistrationCallback = std::function<void(const ndn::Name&)>;
+ struct RegisteredDataPrefix
{
+ const ndn::RegisteredPrefixId* prefixId;
+ int useCount;
+ };
+
+ ReadHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, size_t prefixSubsetLength);
+
+ void
+ listen(const Name& prefix) override;
+
+ void
+ connectAutoListen();
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ const std::map<ndn::Name, RegisteredDataPrefix>&
+ getRegisteredPrefixes()
+ {
+ return m_insertedDataPrefixes;
}
- virtual void
- listen(const Name& prefix);
+ /**
+ * @param after Do something after actually removing a prefix
+ */
+ void
+ onDataDeleted(const Name& name);
+
+ /**
+ * @param after Do something after successfully registering the data prefix
+ */
+ void
+ onDataInserted(const Name& name);
private:
/**
@@ -47,6 +74,12 @@
void
onRegisterFailed(const Name& prefix, const std::string& reason);
+
+private:
+ size_t m_prefixSubsetLength;
+ std::map<ndn::Name, RegisteredDataPrefix> m_insertedDataPrefixes;
+ ndn::util::signal::ScopedConnection afterDataDeletionConnection;
+ ndn::util::signal::ScopedConnection afterDataInsertionConnection;
};
} // namespace repo
diff --git a/src/repo.cpp b/src/repo.cpp
index 28a43a0..20f2f07 100644
--- a/src/repo.cpp
+++ b/src/repo.cpp
@@ -30,7 +30,7 @@
}
std::ifstream fin(configPath.c_str());
- if (!fin.is_open())
+ if (!fin.is_open())
BOOST_THROW_EXCEPTION(Repo::Error("failed to open configuration file '"+ configPath +"'"));
using namespace boost::property_tree;
@@ -48,26 +48,22 @@
repoConfig.repoConfigPath = configPath;
ptree dataConf = repoConf.get_child("data");
- for (ptree::const_iterator it = dataConf.begin();
- it != dataConf.end();
- ++it)
- {
- if (it->first == "prefix")
- repoConfig.dataPrefixes.push_back(Name(it->second.get_value<std::string>()));
+ for (const auto section : dataConf) {
+ if (section.first == "prefix")
+ repoConfig.dataPrefixes.push_back(Name(section.second.get_value<std::string>()));
+ else if (section.first == "registration-subset")
+ repoConfig.registrationSubset = section.second.get_value<int>();
else
- BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + it->first + "' option in 'data' section in "
+ BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + section.first + "' option in 'data' section in "
"configuration file '"+ configPath +"'"));
}
ptree commandConf = repoConf.get_child("command");
- for (ptree::const_iterator it = commandConf.begin();
- it != commandConf.end();
- ++it)
- {
- if (it->first == "prefix")
- repoConfig.repoPrefixes.push_back(Name(it->second.get_value<std::string>()));
+ for (const auto section : commandConf) {
+ if (section.first == "prefix")
+ repoConfig.repoPrefixes.push_back(Name(section.second.get_value<std::string>()));
else
- BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + it->first + "' option in 'command' section in "
+ BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + section.first + "' option in 'command' section in "
"configuration file '"+ configPath +"'"));
}
@@ -75,24 +71,21 @@
bool isTcpBulkEnabled = false;
std::string host = "localhost";
std::string port = "7376";
- for (ptree::const_iterator it = tcpBulkInsert.begin();
- it != tcpBulkInsert.end();
- ++it)
- {
+ for (const auto section : dataConf) {
isTcpBulkEnabled = true;
// tcp_bulk_insert {
// host "localhost" ; IP address or hostname to listen on
// port 7635 ; Port number to listen on
// }
- if (it->first == "host") {
- host = it->second.get_value<std::string>();
+ if (section.first == "host") {
+ host = section.second.get_value<std::string>();
}
- else if (it->first == "port") {
- port = it->second.get_value<std::string>();
+ else if (section.first == "port") {
+ port = section.second.get_value<std::string>();
}
else
- BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + it->first + "' option in 'tcp_bulk_insert' section in "
+ BOOST_THROW_EXCEPTION(Repo::Error("Unrecognized '" + section.first + "' option in 'tcp_bulk_insert' section in "
"configuration file '"+ configPath +"'"));
}
if (isTcpBulkEnabled) {
@@ -119,7 +112,7 @@
, m_store(std::make_shared<SqliteStorage>(config.dbPath))
, m_storageHandle(config.nMaxPackets, *m_store)
, m_validator(m_face)
- , m_readHandle(m_face, m_storageHandle, m_keyChain, m_scheduler)
+ , m_readHandle(m_face, m_storageHandle, m_keyChain, m_scheduler, m_config.registrationSubset)
, m_writeHandle(m_face, m_storageHandle, m_keyChain, m_scheduler, m_validator)
, m_watchHandle(m_face, m_storageHandle, m_keyChain, m_scheduler, m_validator)
, m_deleteHandle(m_face, m_storageHandle, m_keyChain, m_scheduler, m_validator)
@@ -147,7 +140,6 @@
// ReadHandle performs prefix registration internally.
m_readHandle.listen(dataPrefix);
}
-
for (const ndn::Name& cmdPrefix : m_config.repoPrefixes) {
m_face.registerPrefix(cmdPrefix, nullptr,
[] (const Name& cmdPrefix, const std::string& reason) {
diff --git a/src/repo.hpp b/src/repo.hpp
index 382a21e..1f29506 100644
--- a/src/repo.hpp
+++ b/src/repo.hpp
@@ -40,10 +40,13 @@
struct RepoConfig
{
+ static const size_t DISABLED_SUBSET_LENGTH = -1;
+
std::string repoConfigPath;
//StorageMethod storageMethod; This will be implemtented if there is other method.
std::string dbPath;
std::vector<ndn::Name> dataPrefixes;
+ size_t registrationSubset = DISABLED_SUBSET_LENGTH;
std::vector<ndn::Name> repoPrefixes;
std::vector<std::pair<std::string, std::string> > tcpBulkInsertEndpoints;
uint64_t nMaxPackets;
diff --git a/src/storage/repo-storage.cpp b/src/storage/repo-storage.cpp
index 1828eb9..d3834ba 100644
--- a/src/storage/repo-storage.cpp
+++ b/src/storage/repo-storage.cpp
@@ -51,7 +51,10 @@
int64_t id = m_storage.insert(data);
if (id == -1)
return false;
- return m_index.insert(data, id);
+ bool didInsert = m_index.insert(data, id);
+ if (didInsert)
+ afterDataInsertion(data.getName());
+ return didInsert;
}
ssize_t
@@ -65,10 +68,13 @@
while (idName.first != 0) {
bool resultDb = m_storage.erase(idName.first);
bool resultIndex = m_index.erase(idName.second); //full name
- if (resultDb && resultIndex)
+ if (resultDb && resultIndex) {
+ afterDataDeletion(idName.second);
count++;
- else
+ }
+ else {
hasError = true;
+ }
idName = m_index.find(name);
}
if (hasError)
@@ -88,10 +94,13 @@
while (idName.first != 0) {
bool resultDb = m_storage.erase(idName.first);
bool resultIndex = m_index.erase(idName.second); //full name
- if (resultDb && resultIndex)
+ if (resultDb && resultIndex) {
+ afterDataDeletion(idName.second);
count++;
- else
+ }
+ else {
hasError = true;
+ }
idName = m_index.find(interestDelete);
}
if (hasError)
diff --git a/src/storage/repo-storage.hpp b/src/storage/repo-storage.hpp
index bc2ecbe..bf3ade8 100644
--- a/src/storage/repo-storage.hpp
+++ b/src/storage/repo-storage.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014, Regents of the University of California.
+ * Copyright (c) 2014-2017, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -17,8 +17,8 @@
* repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
*/
-#ifndef REPO_STORAGE_REPO_STORE_HPP
-#define REPO_STORAGE_REPO_STORE_HPP
+#ifndef REPO_REPO_STORAGE_HPP
+#define REPO_REPO_STORAGE_HPP
#include "../common.hpp"
#include "storage.hpp"
@@ -26,6 +26,7 @@
#include "../repo-command-parameter.hpp"
#include <ndn-cxx/exclude.hpp>
+#include <ndn-cxx/util/signal.hpp>
#include <queue>
@@ -89,6 +90,10 @@
std::shared_ptr<Data>
readData(const Interest& interest) const;
+public:
+ ndn::util::Signal<RepoStorage, ndn::Name> afterDataInsertion;
+ ndn::util::Signal<RepoStorage, ndn::Name> afterDataDeletion;
+
private:
Index m_index;
Storage& m_storage;
@@ -96,4 +101,4 @@
} // namespace repo
-#endif // REPO_REPO_STORE_HPP
+#endif // REPO_REPO_STORAGE_HPP