Use ndn::mgmt::Dispatcher for repo commands

refs #4129

Change-Id: Idb7826fc76b6660ce76d69e7e88a9e922c55a2e1
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
index 0283728..6cc9583 100644
--- a/src/handles/write-handle.cpp
+++ b/src/handles/write-handle.cpp
@@ -19,24 +19,34 @@
 
 #include "write-handle.hpp"
 
+#include <ndn-cxx/util/random.hpp>
+
 namespace repo {
 
 static const int RETRY_TIMEOUT = 3;
 static const int DEFAULT_CREDIT = 12;
-static const milliseconds NOEND_TIMEOUT(10000);
-static const milliseconds PROCESS_DELETE_TIME(10000);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
+static const milliseconds NOEND_TIMEOUT(10000_ms);
+static const milliseconds PROCESS_DELETE_TIME(10000_ms);
+static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
 
-WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
-                         Scheduler& scheduler,
-                         Validator& validator)
-  : BaseHandle(face, storageHandle, keyChain, scheduler)
+WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
+                         Scheduler& scheduler, Validator& validator)
+  : CommandBaseHandle(face, storageHandle, scheduler, validator)
   , m_validator(validator)
   , m_retryTime(RETRY_TIMEOUT)
   , m_credit(DEFAULT_CREDIT)
   , m_noEndTimeout(NOEND_TIMEOUT)
   , m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
 {
+  dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
+    makeAuthorization(),
+    std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
+    std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
+
+  dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
+    makeAuthorization(),
+    std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
+    std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
 }
 
 void
@@ -45,46 +55,26 @@
   m_processes.erase(processId);
 }
 
-// Interest.
 void
-WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
+                                 const ndn::mgmt::ControlParameters& parameter,
+                                 const ndn::mgmt::CommandContinuation& done)
 {
-  m_validator.validate(interest,
-                       bind(&WriteHandle::onValidated, this, _1, prefix),
-                       bind(&WriteHandle::onValidationFailed, this, _1, _2));
-}
+  RepoCommandParameter* repoParameter =
+    dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(&parameter));
 
-void
-WriteHandle::onValidated(const Interest& interest, const Name& prefix)
-{
-  RepoCommandParameter parameter;
-  try {
-    extractParameter(interest, prefix, parameter);
-  }
-  catch (const RepoCommandParameter::Error&) {
-    negativeReply(interest, 403);
-    return;
-  }
-
-  if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
-    if (parameter.hasSelectors()) {
-      negativeReply(interest, 402);
+  if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
+    if (repoParameter->hasSelectors()) {
+      done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
       return;
     }
-    processSegmentedInsertCommand(interest, parameter);
+    processSegmentedInsertCommand(interest, *repoParameter, done);
   }
   else {
-    processSingleInsertCommand(interest, parameter);
+    processSingleInsertCommand(interest, *repoParameter, done);
   }
-  if (parameter.hasInterestLifetime())
-    m_interestLifetime = parameter.getInterestLifetime();
-}
-
-void
-WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
-  std::cerr << error << std::endl;
-  negativeReply(interest, 401);
+  if (repoParameter->hasInterestLifetime())
+    m_interestLifetime = repoParameter->getInterestLifetime();
 }
 
 void
@@ -106,9 +96,7 @@
   RepoCommandResponse& response = process.response;
 
   if (response.getInsertNum() == 0) {
-    getStorageHandle().insertData(data);
-   // getStorageHandle().insertEntry(data);
-   // getStoreIndex().insert(data);
+    storageHandle.insertData(data);
     response.setInsertNum(1);
   }
 
@@ -153,7 +141,7 @@
   }
 
   //insert data
-  if (getStorageHandle().insertData(data)) {
+  if (storageHandle.insertData(data)) {
     response.setInsertNum(response.getInsertNum() + 1);
   }
 
@@ -176,15 +164,6 @@
 }
 
 void
-WriteHandle::listen(const Name& prefix)
-{
-  getFace().setInterestFilter(Name(prefix).append("insert"),
-                              bind(&WriteHandle::onInterest, this, _1, _2));
-  getFace().setInterestFilter(Name(prefix).append("insert check"),
-                              bind(&WriteHandle::onCheckInterest, this, _1, _2));
-}
-
-void
 WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
 {
   ProcessInfo& process = m_processes[processId];
@@ -214,7 +193,7 @@
     fetchName.appendSegment(segment);
     Interest interest(fetchName);
     interest.setInterestLifetime(m_interestLifetime);
-    getFace().expressInterest(interest,
+    face.expressInterest(interest,
                               bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
                               bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
                               bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -254,7 +233,7 @@
       std::cerr << "noEndtimeout: " << processId << std::endl;
       //m_processes.erase(processId);
       //StatusCode should be refreshed as 405
-      response.setStatusCode(405);
+      response.setCode(405);
       //schedule a delete event
       deferredDeleteProcess(processId);
       return;
@@ -268,7 +247,7 @@
     if (response.getInsertNum() >= nSegments) {
       //m_processes.erase(processId);
       //All the data has been inserted, StatusCode is refreshed as 200
-      response.setStatusCode(200);
+      response.setCode(200);
       deferredDeleteProcess(processId);
       return;
     }
@@ -309,7 +288,7 @@
   fetchName.appendSegment(sendingSegment);
   Interest fetchInterest(fetchName);
   fetchInterest.setInterestLifetime(m_interestLifetime);
-  getFace().expressInterest(fetchInterest,
+  face.expressInterest(fetchInterest,
                             bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
                             bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
                             bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -361,7 +340,7 @@
     retryTime++;
     Interest retryInterest(interest.getName());
     retryInterest.setInterestLifetime(m_interestLifetime);
-    getFace().expressInterest(retryInterest,
+    face.expressInterest(retryInterest,
                               bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
                               bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
                               bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -370,35 +349,17 @@
 }
 
 void
-WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+                                const ndn::mgmt::ControlParameters& parameter,
+                                const ndn::mgmt::CommandContinuation& done)
 {
-  m_validator.validate(interest,
-                       bind(&WriteHandle::onCheckValidated, this, _1, prefix),
-                       bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
+  const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
 
-}
-
-void
-WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix)
-{
-  RepoCommandParameter parameter;
-  try {
-    extractParameter(interest, prefix, parameter);
-  }
-  catch (const RepoCommandParameter::Error&) {
-    negativeReply(interest, 403);
-    return;
-  }
-
-  if (!parameter.hasProcessId()) {
-    negativeReply(interest, 403);
-    return;
-  }
   //check whether this process exists
-  ProcessId processId = parameter.getProcessId();
+  ProcessId processId = repoParameter.getProcessId();
   if (m_processes.count(processId) == 0) {
     std::cerr << "no such processId: " << processId << std::endl;
-    negativeReply(interest, 404);
+    done(negativeReply("No such this process is in progress", 404));
     return;
   }
 
@@ -409,109 +370,100 @@
   //Check whether it is single data fetching
   if (!response.hasStartBlockId() &&
       !response.hasEndBlockId()) {
-    reply(interest, response);
+    //reply(interest, response);
+    done(response);
     return;
   }
 
   //read if noEndtimeout
   if (!response.hasEndBlockId()) {
     extendNoEndTime(process);
-    reply(interest, response);
+    done(response);
     return;
   }
   else {
-    reply(interest, response);
+    done(response);
   }
 }
 
 void
-WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
-{
-  std::cerr << error << std::endl;
-  negativeReply(interest, 401);
-}
-
-void
 WriteHandle::deferredDeleteProcess(ProcessId processId)
 {
-  getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+  scheduler.scheduleEvent(PROCESS_DELETE_TIME,
                                bind(&WriteHandle::deleteProcess, this, processId));
 }
 
 void
-WriteHandle::processSingleInsertCommand(const Interest& interest,
-                                        RepoCommandParameter& parameter)
+WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+                                        const ndn::mgmt::CommandContinuation& done)
 {
-  ProcessId processId = generateProcessId();
+  ProcessId processId = ndn::random::generateWord64();
 
   ProcessInfo& process = m_processes[processId];
 
   RepoCommandResponse& response = process.response;
-  response.setStatusCode(100);
+  response.setCode(100);
   response.setProcessId(processId);
   response.setInsertNum(0);
+  response.setBody(response.wireEncode());
+  done(response);
 
-  reply(interest, response);
-
-  response.setStatusCode(300);
+  response.setCode(300);
 
   Interest fetchInterest(parameter.getName());
   fetchInterest.setInterestLifetime(m_interestLifetime);
   if (parameter.hasSelectors()) {
     fetchInterest.setSelectors(parameter.getSelectors());
   }
-  getFace().expressInterest(fetchInterest,
-                            bind(&WriteHandle::onData, this, _1, _2, processId),
-                            bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
-                            bind(&WriteHandle::onTimeout, this, _1, processId));
+  face.expressInterest(fetchInterest,
+                       bind(&WriteHandle::onData, this, _1, _2, processId),
+                       bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
+                       bind(&WriteHandle::onTimeout, this, _1, processId));
 }
 
 void
-WriteHandle::processSegmentedInsertCommand(const Interest& interest,
-                                           RepoCommandParameter& parameter)
+WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+                                           const ndn::mgmt::CommandContinuation& done)
 {
   if (parameter.hasEndBlockId()) {
     //normal fetch segment
-    if (!parameter.hasStartBlockId()) {
-      parameter.setStartBlockId(0);
-    }
-
-    SegmentNo startBlockId = parameter.getStartBlockId();
+    SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
     SegmentNo endBlockId = parameter.getEndBlockId();
     if (startBlockId > endBlockId) {
-      negativeReply(interest, 403);
+      done(negativeReply("Malformed Command", 403));
       return;
     }
 
-    ProcessId processId = generateProcessId();
+    ProcessId processId = ndn::random::generateWord64();
     ProcessInfo& process = m_processes[processId];
     RepoCommandResponse& response = process.response;
-    response.setStatusCode(100);
+    response.setCode(100);
     response.setProcessId(processId);
     response.setInsertNum(0);
     response.setStartBlockId(startBlockId);
     response.setEndBlockId(endBlockId);
-
-    reply(interest, response);
+    response.setBody(response.wireEncode());
+    done(response);
 
     //300 means data fetching is in progress
-    response.setStatusCode(300);
+    response.setCode(300);
 
     segInit(processId, parameter);
   }
   else {
     //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
-    ProcessId processId = generateProcessId();
+    ProcessId processId = ndn::random::generateWord64();
     ProcessInfo& process = m_processes[processId];
     RepoCommandResponse& response = process.response;
-    response.setStatusCode(100);
+    response.setCode(100);
     response.setProcessId(processId);
     response.setInsertNum(0);
     response.setStartBlockId(parameter.getStartBlockId());
-    reply(interest, response);
+    response.setBody(response.wireEncode());
+    done(response);
 
     //300 means data fetching is in progress
-    response.setStatusCode(300);
+    response.setCode(300);
 
     segInit(processId, parameter);
   }
@@ -524,21 +476,21 @@
   ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
   RepoCommandResponse& response = process.response;
   if (now > noEndTime) {
-    response.setStatusCode(405);
+    response.setCode(405);
     return;
   }
   //extends noEndTime
-  process.noEndTime =
-    ndn::time::steady_clock::now() + m_noEndTimeout;
+  process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout;
 
 }
 
-void
-WriteHandle::negativeReply(const Interest& interest, int statusCode)
+RepoCommandResponse
+WriteHandle::negativeReply(std::string text, int statusCode)
 {
-  RepoCommandResponse response;
-  response.setStatusCode(statusCode);
-  reply(interest, response);
+  RepoCommandResponse response(statusCode, text);
+  response.setBody(response.wireEncode());
+
+  return response;
 }
 
 } // namespace repo