Use ndn::mgmt::Dispatcher for repo commands
refs #4129
Change-Id: Idb7826fc76b6660ce76d69e7e88a9e922c55a2e1
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
index 0283728..6cc9583 100644
--- a/src/handles/write-handle.cpp
+++ b/src/handles/write-handle.cpp
@@ -19,24 +19,34 @@
#include "write-handle.hpp"
+#include <ndn-cxx/util/random.hpp>
+
namespace repo {
static const int RETRY_TIMEOUT = 3;
static const int DEFAULT_CREDIT = 12;
-static const milliseconds NOEND_TIMEOUT(10000);
-static const milliseconds PROCESS_DELETE_TIME(10000);
-static const milliseconds DEFAULT_INTEREST_LIFETIME(4000);
+static const milliseconds NOEND_TIMEOUT(10000_ms);
+static const milliseconds PROCESS_DELETE_TIME(10000_ms);
+static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
-WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, KeyChain& keyChain,
- Scheduler& scheduler,
- Validator& validator)
- : BaseHandle(face, storageHandle, keyChain, scheduler)
+WriteHandle::WriteHandle(Face& face, RepoStorage& storageHandle, ndn::mgmt::Dispatcher& dispatcher,
+ Scheduler& scheduler, Validator& validator)
+ : CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
, m_retryTime(RETRY_TIMEOUT)
, m_credit(DEFAULT_CREDIT)
, m_noEndTimeout(NOEND_TIMEOUT)
, m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
{
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert"),
+ makeAuthorization(),
+ std::bind(&WriteHandle::validateParameters<InsertCommand>, this, _1),
+ std::bind(&WriteHandle::handleInsertCommand, this, _1, _2, _3, _4));
+
+ dispatcher.addControlCommand<RepoCommandParameter>(ndn::PartialName("insert check"),
+ makeAuthorization(),
+ std::bind(&WriteHandle::validateParameters<InsertCheckCommand>, this, _1),
+ std::bind(&WriteHandle::handleCheckCommand, this, _1, _2, _3, _4));
}
void
@@ -45,46 +55,26 @@
m_processes.erase(processId);
}
-// Interest.
void
-WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleInsertCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WriteHandle::onValidated, this, _1, prefix),
- bind(&WriteHandle::onValidationFailed, this, _1, _2));
-}
+ RepoCommandParameter* repoParameter =
+ dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(¶meter));
-void
-WriteHandle::onValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
- if (parameter.hasSelectors()) {
- negativeReply(interest, 402);
+ if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
+ if (repoParameter->hasSelectors()) {
+ done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
return;
}
- processSegmentedInsertCommand(interest, parameter);
+ processSegmentedInsertCommand(interest, *repoParameter, done);
}
else {
- processSingleInsertCommand(interest, parameter);
+ processSingleInsertCommand(interest, *repoParameter, done);
}
- if (parameter.hasInterestLifetime())
- m_interestLifetime = parameter.getInterestLifetime();
-}
-
-void
-WriteHandle::onValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
+ if (repoParameter->hasInterestLifetime())
+ m_interestLifetime = repoParameter->getInterestLifetime();
}
void
@@ -106,9 +96,7 @@
RepoCommandResponse& response = process.response;
if (response.getInsertNum() == 0) {
- getStorageHandle().insertData(data);
- // getStorageHandle().insertEntry(data);
- // getStoreIndex().insert(data);
+ storageHandle.insertData(data);
response.setInsertNum(1);
}
@@ -153,7 +141,7 @@
}
//insert data
- if (getStorageHandle().insertData(data)) {
+ if (storageHandle.insertData(data)) {
response.setInsertNum(response.getInsertNum() + 1);
}
@@ -176,15 +164,6 @@
}
void
-WriteHandle::listen(const Name& prefix)
-{
- getFace().setInterestFilter(Name(prefix).append("insert"),
- bind(&WriteHandle::onInterest, this, _1, _2));
- getFace().setInterestFilter(Name(prefix).append("insert check"),
- bind(&WriteHandle::onCheckInterest, this, _1, _2));
-}
-
-void
WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
{
ProcessInfo& process = m_processes[processId];
@@ -214,7 +193,7 @@
fetchName.appendSegment(segment);
Interest interest(fetchName);
interest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(interest,
+ face.expressInterest(interest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -254,7 +233,7 @@
std::cerr << "noEndtimeout: " << processId << std::endl;
//m_processes.erase(processId);
//StatusCode should be refreshed as 405
- response.setStatusCode(405);
+ response.setCode(405);
//schedule a delete event
deferredDeleteProcess(processId);
return;
@@ -268,7 +247,7 @@
if (response.getInsertNum() >= nSegments) {
//m_processes.erase(processId);
//All the data has been inserted, StatusCode is refreshed as 200
- response.setStatusCode(200);
+ response.setCode(200);
deferredDeleteProcess(processId);
return;
}
@@ -309,7 +288,7 @@
fetchName.appendSegment(sendingSegment);
Interest fetchInterest(fetchName);
fetchInterest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(fetchInterest,
+ face.expressInterest(fetchInterest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -361,7 +340,7 @@
retryTime++;
Interest retryInterest(interest.getName());
retryInterest.setInterestLifetime(m_interestLifetime);
- getFace().expressInterest(retryInterest,
+ face.expressInterest(retryInterest,
bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
@@ -370,35 +349,17 @@
}
void
-WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- m_validator.validate(interest,
- bind(&WriteHandle::onCheckValidated, this, _1, prefix),
- bind(&WriteHandle::onCheckValidationFailed, this, _1, _2));
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-}
-
-void
-WriteHandle::onCheckValidated(const Interest& interest, const Name& prefix)
-{
- RepoCommandParameter parameter;
- try {
- extractParameter(interest, prefix, parameter);
- }
- catch (const RepoCommandParameter::Error&) {
- negativeReply(interest, 403);
- return;
- }
-
- if (!parameter.hasProcessId()) {
- negativeReply(interest, 403);
- return;
- }
//check whether this process exists
- ProcessId processId = parameter.getProcessId();
+ ProcessId processId = repoParameter.getProcessId();
if (m_processes.count(processId) == 0) {
std::cerr << "no such processId: " << processId << std::endl;
- negativeReply(interest, 404);
+ done(negativeReply("No such this process is in progress", 404));
return;
}
@@ -409,109 +370,100 @@
//Check whether it is single data fetching
if (!response.hasStartBlockId() &&
!response.hasEndBlockId()) {
- reply(interest, response);
+ //reply(interest, response);
+ done(response);
return;
}
//read if noEndtimeout
if (!response.hasEndBlockId()) {
extendNoEndTime(process);
- reply(interest, response);
+ done(response);
return;
}
else {
- reply(interest, response);
+ done(response);
}
}
void
-WriteHandle::onCheckValidationFailed(const Interest& interest, const ValidationError& error)
-{
- std::cerr << error << std::endl;
- negativeReply(interest, 401);
-}
-
-void
WriteHandle::deferredDeleteProcess(ProcessId processId)
{
- getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
bind(&WriteHandle::deleteProcess, this, processId));
}
void
-WriteHandle::processSingleInsertCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
+ response.setBody(response.wireEncode());
+ done(response);
- reply(interest, response);
-
- response.setStatusCode(300);
+ response.setCode(300);
Interest fetchInterest(parameter.getName());
fetchInterest.setInterestLifetime(m_interestLifetime);
if (parameter.hasSelectors()) {
fetchInterest.setSelectors(parameter.getSelectors());
}
- getFace().expressInterest(fetchInterest,
- bind(&WriteHandle::onData, this, _1, _2, processId),
- bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onTimeout, this, _1, processId));
+ face.expressInterest(fetchInterest,
+ bind(&WriteHandle::onData, this, _1, _2, processId),
+ bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
+ bind(&WriteHandle::onTimeout, this, _1, processId));
}
void
-WriteHandle::processSegmentedInsertCommand(const Interest& interest,
- RepoCommandParameter& parameter)
+WriteHandle::processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
+ const ndn::mgmt::CommandContinuation& done)
{
if (parameter.hasEndBlockId()) {
//normal fetch segment
- if (!parameter.hasStartBlockId()) {
- parameter.setStartBlockId(0);
- }
-
- SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo startBlockId = parameter.hasStartBlockId() ? parameter.getStartBlockId() : 0;
SegmentNo endBlockId = parameter.getEndBlockId();
if (startBlockId > endBlockId) {
- negativeReply(interest, 403);
+ done(negativeReply("Malformed Command", 403));
return;
}
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
response.setStartBlockId(startBlockId);
response.setEndBlockId(endBlockId);
-
- reply(interest, response);
+ response.setBody(response.wireEncode());
+ done(response);
//300 means data fetching is in progress
- response.setStatusCode(300);
+ response.setCode(300);
segInit(processId, parameter);
}
else {
//no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
- ProcessId processId = generateProcessId();
+ ProcessId processId = ndn::random::generateWord64();
ProcessInfo& process = m_processes[processId];
RepoCommandResponse& response = process.response;
- response.setStatusCode(100);
+ response.setCode(100);
response.setProcessId(processId);
response.setInsertNum(0);
response.setStartBlockId(parameter.getStartBlockId());
- reply(interest, response);
+ response.setBody(response.wireEncode());
+ done(response);
//300 means data fetching is in progress
- response.setStatusCode(300);
+ response.setCode(300);
segInit(processId, parameter);
}
@@ -524,21 +476,21 @@
ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
RepoCommandResponse& response = process.response;
if (now > noEndTime) {
- response.setStatusCode(405);
+ response.setCode(405);
return;
}
//extends noEndTime
- process.noEndTime =
- ndn::time::steady_clock::now() + m_noEndTimeout;
+ process.noEndTime = ndn::time::steady_clock::now() + m_noEndTimeout;
}
-void
-WriteHandle::negativeReply(const Interest& interest, int statusCode)
+RepoCommandResponse
+WriteHandle::negativeReply(std::string text, int statusCode)
{
- RepoCommandResponse response;
- response.setStatusCode(statusCode);
- reply(interest, response);
+ RepoCommandResponse response(statusCode, text);
+ response.setBody(response.wireEncode());
+
+ return response;
}
} // namespace repo