Use ndn::mgmt::Dispatcher for repo commands
refs #4129
Change-Id: Idb7826fc76b6660ce76d69e7e88a9e922c55a2e1
diff --git a/src/handles/watch-handle.cpp b/src/handles/watch-handle.cpp
index 6fe93d4..105e802 100644
--- a/src/handles/watch-handle.cpp
+++ b/src/handles/watch-handle.cpp
@@ -21,12 +21,12 @@
namespace repo {
-static const milliseconds PROCESS_DELETE_TIME(10000);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
+static const milliseconds PROCESS_DELETE_TIME(10000_ms);
+static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
-WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler, Validator& validator)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
+WatchHandle::WatchHandle(Face& face, RepoStorage& storageHandle,
+ ndn::mgmt::Dispatcher& dispatcher, Scheduler& scheduler, Validator& validator)
+ : CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
, m_interestNum(0)
, m_maxInterestNum(0)
@@ -35,6 +35,20 @@
, m_startTime(steady_clock::now())
, m_size(0)
{
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("start"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchStartCommand>, this, _1),
+ std::bind(&WatchHandle::handleStartCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("check"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchCheckCommand>, this, _1),
+ std::bind(&WatchHandle::handleCheckCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("watch").append("stop"),
+ makeAuthorization(),
+ std::bind(&WatchHandle::validateParameters<WatchStopCommand>, this, _1),
+ std::bind(&WatchHandle::handleStopCommand, this, _1, _2, _3, _4));
}
void
@@ -43,28 +57,13 @@
m_processes.erase(name);
}
-// Interest.
void
-WatchHandle::onInterest(const Name& prefix, const Interest& interest)
+WatchHandle::handleStartCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WatchHandle::onValidated, this, _1, prefix),
- bind(&WatchHandle::onValidationFailed, this, _1, _2));
-}
-
-void
-WatchHandle::onValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- processWatchCommand(interest, parameter);
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+ processWatchCommand(interest, repoParameter, done);
}
void WatchHandle::watchStop(const Name& name)
@@ -73,24 +72,18 @@
m_maxInterestNum = 0;
m_interestNum = 0;
m_startTime = steady_clock::now();
- m_watchTimeout = milliseconds(0);
+ m_watchTimeout = 0_ms;
m_interestLifetime = DEFAULT_INTEREST_LIFETIME;
m_size = 0;
}
-void
-WatchHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
void
WatchHandle::onData(const Interest& interest, const ndn::Data& data, const Name& name)
{
- m_validator.validate(data,
- bind(&WatchHandle::onDataValidated, this, interest, _1, name),
- bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
+ m_validator.validate(data,
+ bind(&WatchHandle::onDataValidated, this, interest, _1, name),
+ bind(&WatchHandle::onDataValidationFailed, this, interest, _1, _2, name));
}
void
@@ -99,7 +92,7 @@
if (!m_processes[name].second) {
return;
}
- if (getStorageHandle().insertData(data)) {
+ if (storageHandle.insertData(data)) {
m_size++;
if (!onRunning(name))
return;
@@ -125,7 +118,7 @@
}
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -168,7 +161,7 @@
}
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -190,7 +183,7 @@
fetchInterest.setChildSelector(1);
++m_interestNum;
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WatchHandle::onData, this, _1, _2, name),
bind(&WatchHandle::onTimeout, this, _1, name), // Nack
bind(&WatchHandle::onTimeout, this, _1, name));
@@ -198,112 +191,60 @@
}
void
-WatchHandle::listen(const Name& prefix)
+WatchHandle::handleStopCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- getFace().setInterestFilter(Name(prefix).append("watch").append("start"),
- bind(&WatchHandle::onInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("watch").append("check"),
- bind(&WatchHandle::onCheckInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("watch").append("stop"),
- bind(&WatchHandle::onStopInterest, this, _1, _2));
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+
+ watchStop(repoParameter.getName());
+ std::string text = "Watched Prefix Insertion for prefix (" + prefix.toUri() + ") is stop.";
+ return done(RepoCommandResponse(101, text));
}
void
-WatchHandle::onStopInterest(const Name& prefix, const Interest& interest)
+WatchHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WatchHandle::onStopValidated, this, _1, prefix),
- bind(&WatchHandle::onStopValidationFailed, this, _1, _2));
-}
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-void
-WatchHandle::onStopValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- watchStop(parameter.getName());
- negativeReply(interest, 101);
-}
-
-void
-WatchHandle::onStopValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
-
-void
-WatchHandle::onCheckInterest(const Name& prefix, const Interest& interest)
-{
- m_validator.validate(interest,
- bind(&WatchHandle::onCheckValidated, this, _1, prefix),
- bind(&WatchHandle::onCheckValidationFailed, this, _1, _2));
-}
-
-void
-WatchHandle::onCheckValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (!parameter.hasName()) {
- negativeReply(interest, 403);
- return;
- }
//check whether this process exists
- Name name = parameter.getName();
+ Name name = repoParameter.getName();
if (m_processes.count(name) == 0) {
std::cerr << "no such process name: " << name << std::endl;
- negativeReply(interest, 404);
- return;
+ RepoCommandResponse response(404, "No such process is in progress");
+ response.setBody(response.wireEncode());
+ return done(response);
}
RepoCommandResponse& response = m_processes[name].first;
- if (!m_processes[name].second) {
- response.setStatusCode(101);
+
+ if (!m_processes[name].second) {
+ response.setCode(101);
}
- reply(interest, response);
-
-}
-
-void
-WatchHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
+ return done(response);
}
void
WatchHandle::deferredDeleteProcess(const Name& name)
{
- getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
bind(&WatchHandle::deleteProcess, this, name));
}
void
WatchHandle::processWatchCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+ const RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
// if there is no watchTimeout specified, m_watchTimeout will be set as 0 and this handle will run forever
if (parameter.hasWatchTimeout()) {
m_watchTimeout = parameter.getWatchTimeout();
}
else {
- m_watchTimeout = milliseconds(0);
+ m_watchTimeout = 0_ms;
}
// if there is no maxInterestNum specified, m_maxInterestNum will be 0, which means infinity
@@ -318,10 +259,14 @@
m_interestLifetime = parameter.getInterestLifetime();
}
- reply(interest, RepoCommandResponse().setStatusCode(100));
+ RepoCommandResponse response(100, "Watching the prefix started.");
+ response.setBody(response.wireEncode());
+ done(response);
m_processes[parameter.getName()] =
- std::make_pair(RepoCommandResponse().setStatusCode(300), true);
+ std::make_pair(RepoCommandResponse(300, "This watched prefix Insertion is in progress"),
+ true);
+
Interest fetchInterest(parameter.getName());
if (parameter.hasSelectors()) {
fetchInterest.setSelectors(parameter.getSelectors());
@@ -330,19 +275,10 @@
fetchInterest.setInterestLifetime(m_interestLifetime);
m_startTime = steady_clock::now();
m_interestNum++;
- getFace().expressInterest(fetchInterest,
- bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
- bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
-}
-
-
-void
-WatchHandle::negativeReply(const Interest& interest, int statusCode)
-{
- RepoCommandResponse response;
- response.setStatusCode(statusCode);
- reply(interest, response);
+ face.expressInterest(fetchInterest,
+ bind(&WatchHandle::onData, this, _1, _2, parameter.getName()),
+ bind(&WatchHandle::onTimeout, this, _1, parameter.getName()), // Nack
+ bind(&WatchHandle::onTimeout, this, _1, parameter.getName()));
}
bool