Use ndn::mgmt::Dispatcher for repo commands
refs #4129
Change-Id: Idb7826fc76b6660ce76d69e7e88a9e922c55a2e1
diff --git a/src/handles/base-handle.cpp b/src/handles/base-handle.cpp
deleted file mode 100644
index 8555f39..0000000
--- a/src/handles/base-handle.cpp
+++ /dev/null
@@ -1,32 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/*
- * Copyright (c) 2014-2017, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "base-handle.hpp"
-
-#include <ndn-cxx/util/random.hpp>
-
-namespace repo {
-
-uint64_t
-BaseHandle::generateProcessId()
-{
- return ndn::random::generateWord64();
-}
-
-} // namespace repo
diff --git a/src/handles/base-handle.hpp b/src/handles/base-handle.hpp
deleted file mode 100644
index 5d12076..0000000
--- a/src/handles/base-handle.hpp
+++ /dev/null
@@ -1,132 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California.
- *
- * This file is part of NDN repo-ng (Next generation of NDN repository).
- * See AUTHORS.md for complete list of repo-ng authors and contributors.
- *
- * repo-ng is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef REPO_HANDLES_BASE_HANDLE_HPP
-#define REPO_HANDLES_BASE_HANDLE_HPP
-
-#include "common.hpp"
-
-#include "storage/repo-storage.hpp"
-#include "repo-command-response.hpp"
-#include "repo-command-parameter.hpp"
-
-namespace repo {
-
-class BaseHandle : noncopyable
-{
-public:
- class Error : public std::runtime_error
- {
- public:
- explicit
- Error(const std::string& what)
- : std::runtime_error(what)
- {
- }
- };
-
-public:
- BaseHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler)
- : m_storageHandle(storageHandle)
- , m_face(face)
- , m_keyChain(keyChain)
- , m_scheduler(scheduler)
- // , m_storeindex(storeindex)
- {
- }
-
- virtual
- ~BaseHandle() = default;
-
- virtual void
- listen(const Name& prefix) = 0;
-
-protected:
-
- inline Face&
- getFace()
- {
- return m_face;
- }
-
- inline RepoStorage&
- getStorageHandle()
- {
- return m_storageHandle;
- }
-
- inline Scheduler&
- getScheduler()
- {
- return m_scheduler;
- }
-
- // inline RepoStorage&
- // getStoreIndex()
- // {
- // return m_storeindex;
- // }
-
- uint64_t
- generateProcessId();
-
- void
- reply(const Interest& commandInterest, const RepoCommandResponse& response);
-
-
- /**
- * @brief extract RepoCommandParameter from a command Interest.
- * @param interest command Interest
- * @param prefix Name prefix up to command-verb
- * @param[out] parameter parsed parameter
- * @throw RepoCommandParameter::Error parse error
- */
- void
- extractParameter(const Interest& interest, const Name& prefix, RepoCommandParameter& parameter);
-
-protected:
- RepoStorage& m_storageHandle;
-
-private:
- Face& m_face;
- KeyChain& m_keyChain;
- Scheduler& m_scheduler;
- // RepoStorage& m_storeindex;
-};
-
-inline void
-BaseHandle::reply(const Interest& commandInterest, const RepoCommandResponse& response)
-{
- std::shared_ptr<Data> rdata = std::make_shared<Data>(commandInterest.getName());
- rdata->setContent(response.wireEncode());
- m_keyChain.sign(*rdata);
- m_face.put(*rdata);
-}
-
-inline void
-BaseHandle::extractParameter(const Interest& interest, const Name& prefix,
- RepoCommandParameter& parameter)
-{
- parameter.wireDecode(interest.getName().get(prefix.size()).blockFromValue());
-}
-
-} // namespace repo
-
-#endif // REPO_HANDLES_BASE_HANDLE_HPP
diff --git a/src/handles/command-base-handle.cpp b/src/handles/command-base-handle.cpp
new file mode 100644
index 0000000..1717839
--- /dev/null
+++ b/src/handles/command-base-handle.cpp
@@ -0,0 +1,77 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "command-base-handle.hpp"
+
+#include <ndn-cxx/util/random.hpp>
+
+namespace repo {
+
+/** \brief an Interest tag to indicate command signer
+ */
+using SignerTag = ndn::SimpleTag<ndn::Name, 20>;
+
+/** \brief obtain signer from SignerTag attached to Interest, if available
+ */
+static ndn::optional<std::string>
+getSignerFromTag(const ndn::Interest& interest)
+{
+ shared_ptr<SignerTag> signerTag = interest.getTag<SignerTag>();
+ if (signerTag == nullptr) {
+ return ndn::nullopt;
+ }
+ else {
+ return signerTag->get().toUri();
+ }
+}
+
+CommandBaseHandle::CommandBaseHandle(Face& face, RepoStorage& storageHandle,
+ Scheduler& scheduler, Validator& validator)
+ : face(face)
+ , storageHandle(storageHandle)
+ , scheduler(scheduler)
+ , m_validator(validator)
+{
+}
+
+ndn::mgmt::Authorization
+CommandBaseHandle::makeAuthorization()
+{
+ return [=] (const ndn::Name& prefix, const ndn::Interest& interest,
+ const ndn::mgmt::ControlParameters* params,
+ const ndn::mgmt::AcceptContinuation& accept,
+ const ndn::mgmt::RejectContinuation& reject) {
+ m_validator.validate(interest,
+ [accept] (const ndn::Interest& request) {
+
+ auto signer1 = getSignerFromTag(request);
+ std::string signer = signer1.value_or("*");
+ //_LOG_DEBUG("accept " << request->getName() << " signer=" << signer);
+ accept(signer);
+ },
+ [reject] (const ndn::Interest& request,
+ const ndn::security::v2::ValidationError& error) {
+ //_LOG_DEBUG("reject " << request->getName() << " signer=" <<
+ // getSignerFromTag(*request).value_or("?") << ' ' << failureInfo);
+ reject(ndn::mgmt::RejectReply::STATUS403);
+ });
+ };
+}
+
+} // namespace repo
diff --git a/src/handles/command-base-handle.hpp b/src/handles/command-base-handle.hpp
new file mode 100644
index 0000000..205b884
--- /dev/null
+++ b/src/handles/command-base-handle.hpp
@@ -0,0 +1,84 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California.
+ *
+ * This file is part of NDN repo-ng (Next generation of NDN repository).
+ * See AUTHORS.md for complete list of repo-ng authors and contributors.
+ *
+ * repo-ng is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef REPO_HANDLES_COMMAND_BASE_HANDLE_HPP
+#define REPO_HANDLES_COMMAND_BASE_HANDLE_HPP
+
+#include "common.hpp"
+
+#include "storage/repo-storage.hpp"
+#include "repo-command-response.hpp"
+#include "repo-command-parameter.hpp"
+#include "repo-command.hpp"
+
+#include <ndn-cxx/mgmt/dispatcher.hpp>
+
+namespace repo {
+
+class CommandBaseHandle
+{
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ CommandBaseHandle(Face& face, RepoStorage& storageHandle,
+ Scheduler& scheduler, Validator& validator);
+
+ virtual
+ ~CommandBaseHandle() = default;
+
+ ndn::mgmt::Authorization
+ makeAuthorization();
+
+ template<typename T>
+ bool
+ validateParameters(const ndn::mgmt::ControlParameters& parameters)
+ {
+ const RepoCommandParameter* castParams =
+ dynamic_cast<const RepoCommandParameter*>(¶meters);
+ BOOST_ASSERT(castParams != nullptr);
+ T command;
+ try {
+ command.validateRequest(*castParams);
+ }
+ catch (const RepoCommand::ArgumentError& ae) {
+ return false;
+ }
+ return true;
+ }
+
+protected:
+ Face& face;
+ RepoStorage& storageHandle;
+ Scheduler& scheduler;
+
+private:
+ Validator& m_validator;
+};
+} // namespace repo
+
+#endif // REPO_HANDLES_COMMAND_BASE_HANDLE_HPP
\ No newline at end of file
diff --git a/src/handles/delete-handle.cpp b/src/handles/delete-handle.cpp
index 17882bb..dc0f5f1 100644
--- a/src/handles/delete-handle.cpp
+++ b/src/handles/delete-handle.cpp
@@ -21,151 +21,112 @@
namespace repo {
-DeleteHandle::DeleteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler,
+DeleteHandle::DeleteHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler,
Validator& validator)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
- , m_validator(validator)
+ : CommandBaseHandle(face, storageHandle, scheduler, validator)
{
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("delete"),
+ makeAuthorization(),
+ std::bind(&DeleteHandle::validateParameters<DeleteCommand>, this, _1),
+ std::bind(&DeleteHandle::handleDeleteCommand, this, _1, _2, _3, _4));
+
}
void
-DeleteHandle::onInterest(const Name& prefix, const Interest& interest)
+DeleteHandle::handleDeleteCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest, bind(&DeleteHandle::onValidated, this, _1, prefix),
- bind(&DeleteHandle::onValidationFailed, this, _1, _2));
-}
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-void
-DeleteHandle::onValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
-
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (parameter.hasSelectors()) {
-
- if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
- negativeReply(interest, 402);
- return;
- }
-
+ if (repoParameter.hasSelectors()) {
//choose data with selector and delete it
- processSelectorDeleteCommand(interest, parameter);
+ processSelectorDeleteCommand(interest, repoParameter, done);
return;
}
- if (!parameter.hasStartBlockId() && !parameter.hasEndBlockId()) {
- processSingleDeleteCommand(interest, parameter);
+ if (!repoParameter.hasStartBlockId() && !repoParameter.hasEndBlockId()) {
+ processSingleDeleteCommand(interest, repoParameter, done);
return;
}
- processSegmentDeleteCommand(interest, parameter);
+ processSegmentDeleteCommand(interest, repoParameter, done);
}
-void
-DeleteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
-//listen change the setinterestfilter
-void
-DeleteHandle::listen(const Name& prefix)
-{
- getFace().setInterestFilter(Name(prefix).append("delete"),
- bind(&DeleteHandle::onInterest, this, _1, _2));
-}
-
-void
+RepoCommandResponse
DeleteHandle::positiveReply(const Interest& interest, const RepoCommandParameter& parameter,
- uint64_t statusCode, uint64_t nDeletedDatas)
+ uint64_t statusCode, uint64_t nDeletedData) const
{
- RepoCommandResponse response;
+ RepoCommandResponse response(statusCode, "Deletion Successful");
+
if (parameter.hasProcessId()) {
response.setProcessId(parameter.getProcessId());
- response.setStatusCode(statusCode);
- response.setDeleteNum(nDeletedDatas);
+ response.setDeleteNum(nDeletedData);
+ response.setBody(response.wireEncode());
}
else {
- response.setStatusCode(403);
+ response.setCode(403);
+ response.setText("Malformed Command");
+ response.setBody(response.wireEncode());
}
- reply(interest, response);
+ return response;
+}
+
+RepoCommandResponse
+DeleteHandle::negativeReply(const Interest& interest, uint64_t statusCode, std::string text) const
+{
+ RepoCommandResponse response(statusCode, text);
+ response.setBody(response.wireEncode());
+ return response;
}
void
-DeleteHandle::negativeReply(const Interest& interest, uint64_t statusCode)
+DeleteHandle::processSingleDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const
{
- RepoCommandResponse response;
- response.setStatusCode(statusCode);
- reply(interest, response);
-}
-
-void
-DeleteHandle::processSingleDeleteCommand(const Interest& interest,
- RepoCommandParameter& parameter)
-{
- int64_t nDeletedDatas = getStorageHandle().deleteData(parameter.getName());
- if (nDeletedDatas == -1) {
+ int64_t nDeletedData = storageHandle.deleteData(parameter.getName());
+ if (nDeletedData == -1) {
std::cerr << "Deletion Failed!" <<std::endl;
- negativeReply(interest, 405); //405 means deletion fail
+ done(negativeReply(interest, 405, "Deletion Failed"));
}
else
- positiveReply(interest, parameter, 200, nDeletedDatas);
+ done(positiveReply(interest, parameter, 200, nDeletedData));
}
void
-DeleteHandle::processSelectorDeleteCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+DeleteHandle::processSelectorDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const
{
- int64_t nDeletedDatas = getStorageHandle()
- .deleteData(Interest(parameter.getName())
- .setSelectors(parameter.getSelectors()));
- if (nDeletedDatas == -1) {
+ int64_t nDeletedData = storageHandle.deleteData(Interest(parameter.getName())
+ .setSelectors(parameter.getSelectors()));
+ if (nDeletedData == -1) {
std::cerr << "Deletion Failed!" <<std::endl;
- negativeReply(interest, 405); //405 means deletion fail
+ done(negativeReply(interest, 405, "Deletion Failed"));
}
else
- positiveReply(interest, parameter, 200, nDeletedDatas);
+ done(positiveReply(interest, parameter, 200, nDeletedData));
}
void
-DeleteHandle::processSegmentDeleteCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+DeleteHandle::processSegmentDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const
{
- if (!parameter.hasStartBlockId())
- parameter.setStartBlockId(0);
+ SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
+ SegmentNo endBlockId = parameter.getEndBlockId();
- if (parameter.hasEndBlockId()) {
- SegmentNo startBlockId = parameter.getStartBlockId();
- SegmentNo endBlockId = parameter.getEndBlockId();
-
- if (startBlockId > endBlockId) {
- negativeReply(interest, 403);
- return;
+ Name prefix = parameter.getName();
+ uint64_t nDeletedData = 0;
+ for (SegmentNo i = startBlockId; i <= endBlockId; i++) {
+ Name name = prefix;
+ name.appendSegment(i);
+ if (storageHandle.deleteData(name)) {
+ nDeletedData++;
}
+ }
+ //All the data deleted, return 200
+ done(positiveReply(interest, parameter, 200, nDeletedData));
- Name prefix = parameter.getName();
- uint64_t nDeletedDatas = 0;
- for (SegmentNo i = startBlockId; i <= endBlockId; i++) {
- Name name = prefix;
- name.appendSegment(i);
- if (getStorageHandle().deleteData(name)) {
- nDeletedDatas++;
- }
- }
- //All the data deleted, return 200
- positiveReply(interest, parameter, 200, nDeletedDatas);
- }
- else {
- BOOST_ASSERT(false); // segmented deletion without EndBlockId, not implemented
- }
}
} // namespace repo
diff --git a/src/handles/delete-handle.hpp b/src/handles/delete-handle.hpp
index 17cb48a..fbb25d3 100644
--- a/src/handles/delete-handle.hpp
+++ b/src/handles/delete-handle.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -20,66 +20,54 @@
#ifndef REPO_HANDLES_DELETE_HANDLE_HPP
#define REPO_HANDLES_DELETE_HANDLE_HPP
-#include "base-handle.hpp"
+#include "command-base-handle.hpp"
+
+#include <ndn-cxx/mgmt/dispatcher.hpp>
namespace repo {
-class DeleteHandle : public BaseHandle
+class DeleteHandle : public CommandBaseHandle
{
public:
- class Error : public BaseHandle::Error
+ class Error : public CommandBaseHandle::Error
{
public:
explicit
Error(const std::string& what)
- : BaseHandle::Error(what)
+ : CommandBaseHandle::Error(what)
{
}
};
public:
- DeleteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, Validator& validator);
-
- virtual void
- listen(const Name& prefix);
+ DeleteHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler, Validator& validator);
private:
void
- onInterest(const Name& prefix, const Interest& interest);
+ handleDeleteCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
- void
- onValidated(const Interest& interest, const Name& prefix);
-
- void
- onValidationFailed(const Interest& interest, const ValidationError& error);
-
- /**
- * @todo delete check has not been realized due to the while loop of segmented data deletion.
- */
- void
- onCheckInterest(const Name& prefix, const Interest& interest);
-
- void
+ RepoCommandResponse
positiveReply(const Interest& interest, const RepoCommandParameter& parameter,
- uint64_t statusCode, uint64_t nDeletedDatas);
+ uint64_t statusCode, uint64_t nDeletedData) const;
+
+ RepoCommandResponse
+ negativeReply(const Interest& interest, uint64_t statusCode, const std::string text) const;
void
- negativeReply(const Interest& interest, uint64_t statusCode);
+ processSingleDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const;
void
- processSingleDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
+ processSelectorDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const;
void
- processSelectorDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
-
- void
- processSegmentDeleteCommand(const Interest& interest, RepoCommandParameter& parameter);
-
-private:
- Validator& m_validator;
-
+ processSegmentDeleteCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done) const;
};
} // namespace repo
diff --git a/src/handles/read-handle.cpp b/src/handles/read-handle.cpp
index b880bdd..44a9d18 100644
--- a/src/handles/read-handle.cpp
+++ b/src/handles/read-handle.cpp
@@ -22,10 +22,10 @@
namespace repo {
-ReadHandle::ReadHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, size_t prefixSubsetLength)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
- , m_prefixSubsetLength(prefixSubsetLength)
+ReadHandle::ReadHandle(Face& face, RepoStorage& storageHandle, size_t prefixSubsetLength)
+ : m_prefixSubsetLength(prefixSubsetLength)
+ , m_face(face)
+ , m_storageHandle(storageHandle)
{
connectAutoListen();
}
@@ -49,9 +49,9 @@
void
ReadHandle::onInterest(const Name& prefix, const Interest& interest)
{
- shared_ptr<ndn::Data> data = getStorageHandle().readData(interest);
+ shared_ptr<ndn::Data> data = m_storageHandle.readData(interest);
if (data != nullptr) {
- getFace().put(*data);
+ m_face.put(*data);
}
}
@@ -59,14 +59,14 @@
ReadHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
{
std::cerr << "ERROR: Failed to register prefix in local hub's daemon" << std::endl;
- getFace().shutdown();
+ m_face.shutdown();
}
void
ReadHandle::listen(const Name& prefix)
{
ndn::InterestFilter filter(prefix);
- getFace().setInterestFilter(filter,
+ m_face.setInterestFilter(filter,
bind(&ReadHandle::onInterest, this, _1, _2),
bind(&ReadHandle::onRegisterFailed, this, _1, _2));
}
@@ -80,7 +80,7 @@
auto check = m_insertedDataPrefixes.find(prefix);
if (check != m_insertedDataPrefixes.end()) {
if (--(check->second.useCount) <= 0) {
- getFace().unsetInterestFilter(check->second.prefixId);
+ m_face.unsetInterestFilter(check->second.prefixId);
m_insertedDataPrefixes.erase(prefix);
}
}
@@ -101,7 +101,7 @@
// everything down, anyway. If registration failures are ever
// considered to be recoverable, we would need to make this
// atomic.
- const ndn::RegisteredPrefixId* prefixId = getFace().setInterestFilter(filter,
+ const ndn::RegisteredPrefixId* prefixId = m_face.setInterestFilter(filter,
[this] (const ndn::InterestFilter& filter, const Interest& interest) {
// Implicit conversion to Name of filter
onInterest(filter, interest);
diff --git a/src/handles/read-handle.hpp b/src/handles/read-handle.hpp
index 8ec67a2..a243abd 100644
--- a/src/handles/read-handle.hpp
+++ b/src/handles/read-handle.hpp
@@ -21,11 +21,15 @@
#define REPO_HANDLES_READ_HANDLE_HPP
#include "common.hpp"
-#include "base-handle.hpp"
+
+#include "storage/repo-storage.hpp"
+#include "repo-command-response.hpp"
+#include "repo-command-parameter.hpp"
+#include "repo-command.hpp"
namespace repo {
-class ReadHandle : public BaseHandle
+class ReadHandle : public noncopyable
{
public:
@@ -37,11 +41,10 @@
int useCount;
};
- ReadHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, size_t prefixSubsetLength);
+ ReadHandle(Face& face, RepoStorage& storageHandle, size_t prefixSubsetLength);
void
- listen(const Name& prefix) override;
+ listen(const Name& prefix);
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
const std::map<ndn::Name, RegisteredDataPrefix>&
@@ -80,6 +83,8 @@
std::map<ndn::Name, RegisteredDataPrefix> m_insertedDataPrefixes;
ndn::util::signal::ScopedConnection afterDataDeletionConnection;
ndn::util::signal::ScopedConnection afterDataInsertionConnection;
+ Face& m_face;
+ RepoStorage& m_storageHandle;
};
} // namespace repo
diff --git a/src/handles/watch-handle.cpp b/src/handles/watch-handle.cpp
index 6fe93d4..105e802 100644
--- a/src/handles/watch-handle.cpp
+++ b/src/handles/watch-handle.cpp
@@ -21,12 +21,12 @@
namespace repo {
-static const milliseconds PROCESS_DELETE_TIME(10000);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
+static const milliseconds PROCESS_DELETE_TIME(10000_ms);
+static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
-WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, Validator& validator)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
+WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler, Validator& validator)
+ : CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
, m_interestNum(0)
, m_maxInterestNum(0)
@@ -35,6 +35,20 @@
, m_startTime(steady_clock::now())
, m_size(0)
{
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("start"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchStartCommand>, this, _1),
+ std::bind(&WatchHandle::handleStartCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("check"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchCheckCommand>, this, _1),
+ std::bind(&WatchHandle::handleCheckCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("stop"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchStopCommand>, this, _1),
+ std::bind(&WatchHandle::handleStopCommand, this, _1, _2, _3, _4));
}
void
@@ -43,28 +57,13 @@
m_processes.erase(name);
}
-// Interest.
void
-WatchHandle::onInterest(const Name& prefix, const Interest& interest)
+WatchHandle::handleStartCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WatchHandle::onValidated, this, _1, prefix),
- bind(&WatchHandle::onValidationFailed, this, _1, _2));
-}
-
-void
-WatchHandle::onValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- processWatchCommand(interest, parameter);
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+ processWatchCommand(interest, repoParameter, done);
}
void WatchHandle::watchStop(const Name& name)
@@ -73,24 +72,18 @@
m_maxInterestNum = 0;
m_interestNum = 0;
m_startTime = steady_clock::now();
- m_watchTimeout = milliseconds(0);
+ m_watchTimeout = 0_ms;
m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
m_size = 0;
}
-void
-WatchHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
void
WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
{
- m_validator.validate(data,
- bind(&WatchHandle::onDataValidated, this, interest, _1, name),
- bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
+ m_validator.validate(data,
+ bind(&WatchHandle::onDataValidated, this, interest, _1, name),
+ bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
}
void
@@ -99,7 +92,7 @@
if (!m_processes[name].second) {
return;
}
- if (getStorageHandle().insertData(data)) {
+ if (storageHandle.insertData(data)) {
m_size++;
if (!onRunning(name))
return;
@@ -125,7 +118,7 @@
}
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -168,7 +161,7 @@
}
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -190,7 +183,7 @@
fetchInterest.setChildSelector(1);
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -198,112 +191,60 @@
}
void
-WatchHandle::listen(const Name& prefix)
+WatchHandle::handleStopCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- getFace().setInterestFilter(Name(prefix).append("watch").append("start"),
- bind(&WatchHandle::onInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("watch").append("check"),
- bind(&WatchHandle::onCheckInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("watch").append("stop"),
- bind(&WatchHandle::onStopInterest, this, _1, _2));
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+
+ watchStop(repoParameter.getName());
+ std::string text = "Watched Prefix Insertion for prefix (" + prefix.toUri() + ") is stop.";
+ return done(RepoCommandResponse(101, text));
}
void
-WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
+WatchHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WatchHandle::onStopValidated, this, _1, prefix),
- bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
-}
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-void
-WatchHandle::onStopValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- watchStop(parameter.getName());
- negativeReply(interest, 101);
-}
-
-void
-WatchHandle::onStopValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
-
-void
-WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
-{
- m_validator.validate(interest,
- bind(&WatchHandle::onCheckValidated, this, _1, prefix),
- bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
-}
-
-void
-WatchHandle::onCheckValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (!parameter.hasName()) {
- negativeReply(interest, 403);
- return;
- }
//check whether this process exists
- Name name = parameter.getName();
+ Name name = repoParameter.getName();
if (m_processes.count(name) == 0) {
std::cerr << "no such process name: " << name << std::endl;
- negativeReply(interest, 404);
- return;
+ RepoCommandResponse response(404, "No such process is in progress");
+ response.setBody(response.wireEncode());
+ return done(response);
}
RepoCommandResponse& response = m_processes[name].first;
- if (!m_processes[name].second) {
- response.setStatusCode(101);
+
+ if (!m_processes[name].second) {
+ response.setCode(101);
}
- reply(interest, response);
-
-}
-
-void
-WatchHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
+ return done(response);
}
void
WatchHandle::deferredDeleteProcess(const Name& name)
{
- getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
bind(&WatchHandle::deleteProcess, this, name));
}
void
WatchHandle::processWatchCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+ const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
// if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
if (parameter.hasWatchTimeout()) {
m_watchTimeout = parameter.getWatchTimeout();
}
else {
- m_watchTimeout = milliseconds(0);
+ m_watchTimeout = 0_ms;
}
// if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
@@ -318,10 +259,14 @@
m_interestLifetime = parameter.getInterestLifetime();
}
- reply(interest, RepoCommandResponse().setStatusCode(100));
+ RepoCommandResponse response(100, "Watching the prefix started.");
+ response.setBody(response.wireEncode());
+ done(response);
m_processes[parameter.getName()] =
- std::make_pair(RepoCommandResponse().setStatusCode(300), true);
+ std::make_pair(RepoCommandResponse(300, "This watched prefix Insertion is in progress"),
+ true);
+
Interest fetchInterest(parameter.getName());
if (parameter.hasSelectors()) {
fetchInterest.setSelectors(parameter.getSelectors());
@@ -330,19 +275,10 @@
fetchInterest.setInterestLifetime(m_interestLifetime);
m_startTime = steady_clock::now();
m_interestNum++;
- getFace().expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
-}
-
-
-void
-WatchHandle::negativeReply(const Interest& interest, int statusCode)
-{
- RepoCommandResponse response;
- response.setStatusCode(statusCode);
- reply(interest, response);
+ face.expressInterest(fetchInterest,
+ bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
+ bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
+ bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
}
bool
diff --git a/src/handles/watch-handle.hpp b/src/handles/watch-handle.hpp
index c7071df..221b67d 100644
--- a/src/handles/watch-handle.hpp
+++ b/src/handles/watch-handle.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -20,7 +20,9 @@
#ifndef REPO_HANDLES_WATCH_HANDLE_HPP
#define REPO_HANDLES_WATCH_HANDLE_HPP
-#include "base-handle.hpp"
+#include "command-base-handle.hpp"
+
+#include <ndn-cxx/mgmt/dispatcher.hpp>
#include <queue>
@@ -38,40 +40,37 @@
* watching the prefix until a command interest tell it to stop, the total
* amount of sent interests reaches a specific number or time out.
*/
-class WatchHandle : public BaseHandle
+class WatchHandle : public CommandBaseHandle
{
public:
- class Error : public BaseHandle::Error
+ class Error : public CommandBaseHandle::Error
{
public:
explicit
Error(const std::string& what)
- : BaseHandle::Error(what)
+ : CommandBaseHandle::Error(what)
{
}
};
public:
- WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, Validator& validator);
-
- virtual void
- listen(const Name& prefix);
+ WatchHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler,
+ Validator& validator);
private: // watch-insert command
/**
* @brief handle watch commands
*/
- void
- onInterest(const Name& prefix, const Interest& interest);
void
- onValidated(const Interest& interest, const Name& prefix);
-
+ handleStartCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
void
- onValidationFailed(const Interest& interest, const ValidationError& error);
+ onValidationFailed(const std::shared_ptr<const Interest>& interest, const std::string& reason);
private: // data fetching
/**
@@ -98,7 +97,8 @@
void
- processWatchCommand(const Interest& interest, RepoCommandParameter& parameter);
+ processWatchCommand(const Interest& interest, const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done);
void
watchStop(const Name& name);
@@ -107,11 +107,11 @@
/**
* @brief handle watch check command
*/
- void
- onCheckInterest(const Name& prefix, const Interest& interest);
void
- onCheckValidated(const Interest& interest, const Name& prefix);
+ handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
void
onCheckValidationFailed(const Interest& interest, const ValidationError& error);
@@ -120,20 +120,17 @@
/**
* @brief handle watch stop command
*/
- void
- onStopInterest(const Name& prefix, const Interest& interest);
void
- onStopValidated(const Interest& interest, const Name& prefix);
+ handleStopCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
void
onStopValidationFailed(const Interest& interest, const ValidationError& error);
private:
void
- negativeReply(const Interest& interest, int statusCode);
-
- void
deferredDeleteProcess(const Name& name);
void
@@ -144,7 +141,6 @@
private:
Validator& m_validator;
-
map<Name, std::pair<RepoCommandResponse, bool> > m_processes;
int64_t m_interestNum;
int64_t m_maxInterestNum;
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
index 0283728..6cc9583 100644
--- a/src/handles/write-handle.cpp
+++ b/src/handles/write-handle.cpp
@@ -19,24 +19,34 @@
#include "write-handle.hpp"
+#include <ndn-cxx/util/random.hpp>
+
namespace repo {
static const int RETRY_TIMEOUT = 3;
static const int DEFAULT_CREDIT = 12;
-static const milliseconds NOEND_TIMEOUT(10000);
-static const milliseconds PROCESS_DELETE_TIME(10000);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
+static const milliseconds NOEND_TIMEOUT(10000_ms);
+static const milliseconds PROCESS_DELETE_TIME(10000_ms);
+static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
-WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler,
- Validator& validator)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
+WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
+ Scheduler& scheduler, Validator& validator)
+ : CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
, m_retryTime(RETRY_TIMEOUT)
, m_credit(DEFAULT_CREDIT)
, m_noEndTimeout(NOEND_TIMEOUT)
, m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
{
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
+ makeAuthorization(),
+ std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
+ std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
+ makeAuthorization(),
+ std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
+ std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
}
void
@@ -45,46 +55,26 @@
m_processes.erase(processId);
}
-// Interest.
void
-WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WriteHandle::onValidated, this, _1, prefix),
- bind(&WriteHandle::onValidationFailed, this, _1, _2));
-}
+ RepoCommandParameter* repoParameter =
+ dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(¶meter));
-void
-WriteHandle::onValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
- if (parameter.hasSelectors()) {
- negativeReply(interest, 402);
+ if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
+ if (repoParameter->hasSelectors()) {
+ done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
return;
}
- processSegmentedInsertCommand(interest, parameter);
+ processSegmentedInsertCommand(interest, *repoParameter, done);
}
else {
- processSingleInsertCommand(interest, parameter);
+ processSingleInsertCommand(interest, *repoParameter, done);
}
- if (parameter.hasInterestLifetime())
- m_interestLifetime = parameter.getInterestLifetime();
-}
-
-void
-WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
+ if (repoParameter->hasInterestLifetime())
+ m_interestLifetime = repoParameter->getInterestLifetime();
}
void
@@ -106,9 +96,7 @@
RepoCommandResponse& response = process.response;
if (response.getInsertNum() == 0) {
- getStorageHandle().insertData(data);
- // getStorageHandle().insertEntry(data);
- // getStoreIndex().insert(data);
+ storageHandle.insertData(data);
response.setInsertNum(1);
}
@@ -153,7 +141,7 @@
}
//insert data
- if (getStorageHandle().insertData(data)) {
+ if (storageHandle.insertData(data)) {
response.setInsertNum(response.getInsertNum() + 1);
}
@@ -176,15 +164,6 @@
}
void
-WriteHandle::listen(const Name& prefix)
-{
- getFace().setInterestFilter(Name(prefix).append("insert"),
- bind(&WriteHandle::onInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("insert check"),
- bind(&WriteHandle::onCheckInterest, this, _1, _2));
-}
-
-void
WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
{
ProcessInfo& process = m_processes[processId];
@@ -214,7 +193,7 @@
fetchName.appendSegment(segment);
Interest interest(fetchName);
interest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(interest,
+ face.expressInterest(interest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -254,7 +233,7 @@
std::cerr << "noEndtimeout: " << processId << std::endl;
//m_processes.erase(processId);
//StatusCode should be refreshed as 405
- response.setStatusCode(405);
+ response.setCode(405);
//schedule a delete event
deferredDeleteProcess(processId);
return;
@@ -268,7 +247,7 @@
if (response.getInsertNum() >= nSegments) {
//m_processes.erase(processId);
//All the data has been inserted, StatusCode is refreshed as 200
- response.setStatusCode(200);
+ response.setCode(200);
deferredDeleteProcess(processId);
return;
}
@@ -309,7 +288,7 @@
fetchName.appendSegment(sendingSegment);
Interest fetchInterest(fetchName);
fetchInterest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -361,7 +340,7 @@
retryTime++;
Interest retryInterest(interest.getName());
retryInterest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(retryInterest,
+ face.expressInterest(retryInterest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -370,35 +349,17 @@
}
void
-WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WriteHandle::onCheckValidated, this, _1, prefix),
- bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-}
-
-void
-WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (!parameter.hasProcessId()) {
- negativeReply(interest, 403);
- return;
- }
//check whether this process exists
- ProcessId processId = parameter.getProcessId();
+ ProcessId processId = repoParameter.getProcessId();
if (m_processes.count(processId) == 0) {
std::cerr << "no such processId: " << processId << std::endl;
- negativeReply(interest, 404);
+ done(negativeReply("No such this process is in progress", 404));
return;
}
@@ -409,109 +370,100 @@
//Check whether it is single data fetching
if (!response.hasStartBlockId() &&
!response.hasEndBlockId()) {
- reply(interest, response);
+ //reply(interest, response);
+ done(response);
return;
}
//read if noEndtimeout
if (!response.hasEndBlockId()) {
extendNoEndTime(process);
- reply(interest, response);
+ done(response);
return;
}
else {
- reply(interest, response);
+ done(response);
}
}
void
-WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
-
-void
WriteHandle::deferredDeleteProcess(ProcessId processId)
{
- getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
bind(&WriteHandle::deleteProcess, this, processId));
}
void
-WriteHandle::processSingleInsertCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
+ response.setBody(response.wireEncode());
+ done(response);
- reply(interest, response);
-
- response.setStatusCode(300);
+ response.setCode(300);
Interest fetchInterest(parameter.getName());
fetchInterest.setInterestLifetime(m_interestLifetime);
if (parameter.hasSelectors()) {
fetchInterest.setSelectors(parameter.getSelectors());
}
- getFace().expressInterest(fetchInterest,
- bind(&WriteHandle::onData, this, _1, _2, processId),
- bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onTimeout, this, _1, processId));
+ face.expressInterest(fetchInterest,
+ bind(&WriteHandle::onData, this, _1, _2, processId),
+ bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
+ bind(&WriteHandle::onTimeout, this, _1, processId));
}
void
-WriteHandle::processSegmentedInsertCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
if (parameter.hasEndBlockId()) {
//normal fetch segment
- if (!parameter.hasStartBlockId()) {
- parameter.setStartBlockId(0);
- }
-
- SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
SegmentNo endBlockId = parameter.getEndBlockId();
if (startBlockId > endBlockId) {
- negativeReply(interest, 403);
+ done(negativeReply("Malformed Command", 403));
return;
}
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
response.setStartBlockId(startBlockId);
response.setEndBlockId(endBlockId);
-
- reply(interest, response);
+ response.setBody(response.wireEncode());
+ done(response);
//300 means data fetching is in progress
- response.setStatusCode(300);
+ response.setCode(300);
segInit(processId, parameter);
}
else {
//no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
response.setStartBlockId(parameter.getStartBlockId());
- reply(interest, response);
+ response.setBody(response.wireEncode());
+ done(response);
//300 means data fetching is in progress
- response.setStatusCode(300);
+ response.setCode(300);
segInit(processId, parameter);
}
@@ -524,21 +476,21 @@
ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
RepoCommandResponse& response = process.response;
if (now > noEndTime) {
- response.setStatusCode(405);
+ response.setCode(405);
return;
}
//extends noEndTime
- process.noEndTime =
- ndn::time::steady_clock::now() + m_noEndTimeout;
+ process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout;
}
-void
-WriteHandle::negativeReply(const Interest& interest, int statusCode)
+RepoCommandResponse
+WriteHandle::negativeReply(std::string text, int statusCode)
{
- RepoCommandResponse response;
- response.setStatusCode(statusCode);
- reply(interest, response);
+ RepoCommandResponse response(statusCode, text);
+ response.setBody(response.wireEncode());
+
+ return response;
}
} // namespace repo
diff --git a/src/handles/write-handle.hpp b/src/handles/write-handle.hpp
index c6622d9..686d68c 100644
--- a/src/handles/write-handle.hpp
+++ b/src/handles/write-handle.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -20,7 +20,9 @@
#ifndef REPO_HANDLES_WRITE_HANDLE_HPP
#define REPO_HANDLES_WRITE_HANDLE_HPP
-#include "base-handle.hpp"
+#include "command-base-handle.hpp"
+
+#include <ndn-cxx/mgmt/dispatcher.hpp>
#include <queue>
@@ -51,27 +53,25 @@
*
* If repo cannot get FinalBlockId in noendTimeout time, the fetching process will terminate.
*/
-class WriteHandle : public BaseHandle
+class WriteHandle : public CommandBaseHandle
{
public:
- class Error : public BaseHandle::Error
+ class Error : public CommandBaseHandle::Error
{
public:
explicit
Error(const std::string& what)
- : BaseHandle::Error(what)
+ : CommandBaseHandle::Error(what)
{
}
};
public:
- WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, Validator& validator);
-
- virtual void
- listen(const Name& prefix);
+ WriteHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler,
+ Validator& validator);
private:
/**
@@ -104,10 +104,9 @@
* @brief handle insert commands
*/
void
- onInterest(const Name& prefix, const Interest& interest);
-
- void
- onValidated(const Interest& interest, const Name& prefix);
+ handleInsertCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
void
onValidationFailed(const Interest& interest, const ValidationError& error);
@@ -129,7 +128,8 @@
onTimeout(const Interest& interest, ProcessId processId);
void
- processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+ processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done);
private: // segmented data fetching
/**
@@ -166,7 +166,8 @@
onSegmentTimeoutControl(ProcessId processId, const Interest& interest);
void
- processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+ processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done);
private:
/**
@@ -188,11 +189,11 @@
/**
* @brief handle insert check command
*/
- void
- onCheckInterest(const Name& prefix, const Interest& interest);
void
- onCheckValidated(const Interest& interest, const Name& prefix);
+ handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameters,
+ const ndn::mgmt::CommandContinuation& done);
void
onCheckValidationFailed(const Interest& interest, const ValidationError& error);
@@ -207,14 +208,12 @@
void
deferredDeleteProcess(ProcessId processId);
- void
- negativeReply(const Interest& interest, int statusCode);
+ RepoCommandResponse
+ negativeReply(std::string text, int statusCode);
private:
Validator& m_validator;
-
map<ProcessId, ProcessInfo> m_processes;
-
int m_retryTime;
int m_credit;
ndn::time::milliseconds m_noEndTimeout;