Remove dependency on Selectors and refactor codebase.
Change-Id: Ic3024b76ba0eea61f790c91c36090b4aa68702a3
Refs: #4522
diff --git a/src/handles/write-handle.cpp b/src/handles/write-handle.cpp
index 6cc9583..675f286 100644
--- a/src/handles/write-handle.cpp
+++ b/src/handles/write-handle.cpp
@@ -19,12 +19,16 @@
#include "write-handle.hpp"
+#include <ndn-cxx/util/logger.hpp>
#include <ndn-cxx/util/random.hpp>
namespace repo {
-static const int RETRY_TIMEOUT = 3;
+NDN_LOG_INIT(repo.WriteHandle);
+
static const int DEFAULT_CREDIT = 12;
+static const bool DEFAULT_CANBE_PREFIX = false;
+static const milliseconds MAX_TIMEOUT(60_s);
static const milliseconds NOEND_TIMEOUT(10000_ms);
static const milliseconds PROCESS_DELETE_TIME(10000_ms);
static const milliseconds DEFAULT_INTEREST_LIFETIME(4000_ms);
@@ -33,8 +37,9 @@
Scheduler& scheduler, Validator& validator)
: CommandBaseHandle(face, storageHandle, scheduler, validator)
, m_validator(validator)
- , m_retryTime(RETRY_TIMEOUT)
, m_credit(DEFAULT_CREDIT)
+ , m_canBePrefix(DEFAULT_CANBE_PREFIX)
+ , m_maxTimeout(MAX_TIMEOUT)
, m_noEndTimeout(NOEND_TIMEOUT)
, m_interestLifetime(DEFAULT_INTEREST_LIFETIME)
{
@@ -64,10 +69,6 @@
dynamic_cast<RepoCommandParameter*>(const_cast<ndn::mgmt::ControlParameters*>(¶meter));
if (repoParameter->hasStartBlockId() || repoParameter->hasEndBlockId()) {
- if (repoParameter->hasSelectors()) {
- done(negativeReply("BlockId present. BlockId is not supported in this protocol", 402));
- return;
- }
processSegmentedInsertCommand(interest, *repoParameter, done);
}
else {
@@ -81,8 +82,8 @@
WriteHandle::onData(const Interest& interest, const Data& data, ProcessId processId)
{
m_validator.validate(data,
- bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
- bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
+ std::bind(&WriteHandle::onDataValidated, this, interest, _1, processId),
+ [](const Data& data, const ValidationError& error){NDN_LOG_ERROR("Error: " << error);});
}
void
@@ -104,296 +105,13 @@
}
void
-WriteHandle::onDataValidationFailed(const Data& data, const ValidationError& error)
-{
- std::cerr << error << std::endl;
-}
-
-void
-WriteHandle::onSegmentData(const Interest& interest, const Data& data, ProcessId processId)
-{
- m_validator.validate(data,
- bind(&WriteHandle::onSegmentDataValidated, this, interest, _1, processId),
- bind(&WriteHandle::onDataValidationFailed, this, _1, _2));
-}
-
-void
-WriteHandle::onSegmentDataValidated(const Interest& interest, const Data& data, ProcessId processId)
-{
- auto it = m_processes.find(processId);
- if (it == m_processes.end()) {
- return;
- }
- RepoCommandResponse& response = it->second.response;
-
- //refresh endBlockId
- auto finalBlock = data.getFinalBlock();
- if (finalBlock && finalBlock->isSegment()) {
- auto finalSeg = finalBlock->toSegment();
- if (response.hasEndBlockId()) {
- if (finalSeg < response.getEndBlockId()) {
- response.setEndBlockId(finalSeg);
- }
- }
- else {
- response.setEndBlockId(finalSeg);
- }
- }
-
- //insert data
- if (storageHandle.insertData(data)) {
- response.setInsertNum(response.getInsertNum() + 1);
- }
-
- onSegmentDataControl(processId, interest);
-}
-
-void
WriteHandle::onTimeout(const Interest& interest, ProcessId processId)
{
- std::cerr << "Timeout" << std::endl;
+ NDN_LOG_DEBUG("Timeout" << std::endl);
m_processes.erase(processId);
}
void
-WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
-{
- std::cerr << "SegTimeout" << std::endl;
-
- onSegmentTimeoutControl(processId, interest);
-}
-
-void
-WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
-{
- ProcessInfo& process = m_processes[processId];
- process.credit = 0;
-
- map<SegmentNo, int>& processRetry = process.retryCounts;
-
- Name name = parameter.getName();
- SegmentNo startBlockId = parameter.getStartBlockId();
-
- uint64_t initialCredit = m_credit;
-
- if (parameter.hasEndBlockId()) {
- initialCredit =
- std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
- }
- else {
- // set noEndTimeout timer
- process.noEndTime = ndn::time::steady_clock::now() +
- m_noEndTimeout;
- }
- process.credit = initialCredit;
- SegmentNo segment = startBlockId;
-
- for (; segment < startBlockId + initialCredit; ++segment) {
- Name fetchName = name;
- fetchName.appendSegment(segment);
- Interest interest(fetchName);
- interest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(interest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- process.credit--;
- processRetry[segment] = 0;
- }
-
- queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
-
- process.nextSegment = segment;
- nextSegmentQueue.push(segment);
-}
-
-void
-WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
-{
- if (m_processes.count(processId) == 0) {
- return;
- }
- ProcessInfo& process = m_processes[processId];
- RepoCommandResponse& response = process.response;
- int& processCredit = process.credit;
- //onSegmentDataControl is called when a data returns.
- //When data returns, processCredit++
- processCredit++;
- SegmentNo& nextSegment = process.nextSegment;
- queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
- map<SegmentNo, int>& retryCounts = process.retryCounts;
-
- //read whether notime timeout
- if (!response.hasEndBlockId()) {
-
- ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
- ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
-
- if (now > noEndTime) {
- std::cerr << "noEndtimeout: " << processId << std::endl;
- //m_processes.erase(processId);
- //StatusCode should be refreshed as 405
- response.setCode(405);
- //schedule a delete event
- deferredDeleteProcess(processId);
- return;
- }
- }
-
- //read whether this process has total ends, if ends, remove control info from the maps
- if (response.hasEndBlockId()) {
- uint64_t nSegments =
- response.getEndBlockId() - response.getStartBlockId() + 1;
- if (response.getInsertNum() >= nSegments) {
- //m_processes.erase(processId);
- //All the data has been inserted, StatusCode is refreshed as 200
- response.setCode(200);
- deferredDeleteProcess(processId);
- return;
- }
- }
-
- //check whether there is any credit
- if (processCredit == 0)
- return;
-
-
- //check whether sent queue empty
- if (nextSegmentQueue.empty()) {
- //do not do anything
- return;
- }
-
- //pop the queue
- SegmentNo sendingSegment = nextSegmentQueue.front();
- nextSegmentQueue.pop();
-
- //check whether sendingSegment exceeds
- if (response.hasEndBlockId() && sendingSegment > response.getEndBlockId()) {
- //do not do anything
- return;
- }
-
- //read whether this is retransmitted data;
- SegmentNo fetchedSegment =
- interest.getName().get(interest.getName().size() - 1).toSegment();
-
- BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
-
- //find this fetched data, remove it from this map
- //rit->second.erase(oit);
- retryCounts.erase(fetchedSegment);
- //express the interest of the top of the queue
- Name fetchName(interest.getName().getPrefix(-1));
- fetchName.appendSegment(sendingSegment);
- Interest fetchInterest(fetchName);
- fetchInterest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(fetchInterest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- //When an interest is expressed, processCredit--
- processCredit--;
- if (retryCounts.count(sendingSegment) == 0) {
- //not found
- retryCounts[sendingSegment] = 0;
- }
- else {
- //found
- retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
- }
- //increase the next seg and put it into the queue
- if (!response.hasEndBlockId() || (nextSegment + 1) <= response.getEndBlockId()) {
- nextSegment++;
- nextSegmentQueue.push(nextSegment);
- }
-}
-
-void
-WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
-{
- if (m_processes.count(processId) == 0) {
- return;
- }
- ProcessInfo& process = m_processes[processId];
- // RepoCommandResponse& response = process.response;
- // SegmentNo& nextSegment = process.nextSegment;
- // queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
- map<SegmentNo, int>& retryCounts = process.retryCounts;
-
- SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
-
- std::cerr << "timeoutSegment: " << timeoutSegment << std::endl;
-
- BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
-
- //read the retry time. If retry out of time, fail the process. if not, plus
- int& retryTime = retryCounts[timeoutSegment];
- if (retryTime >= m_retryTime) {
- //fail this process
- std::cerr << "Retry timeout: " << processId << std::endl;
- m_processes.erase(processId);
- return;
- }
- else {
- //Reput it in the queue, retryTime++
- retryTime++;
- Interest retryInterest(interest.getName());
- retryInterest.setInterestLifetime(m_interestLifetime);
- face.expressInterest(retryInterest,
- bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
- }
-
-}
-
-void
-WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
- const ndn::mgmt::ControlParameters& parameter,
- const ndn::mgmt::CommandContinuation& done)
-{
- const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
-
- //check whether this process exists
- ProcessId processId = repoParameter.getProcessId();
- if (m_processes.count(processId) == 0) {
- std::cerr << "no such processId: " << processId << std::endl;
- done(negativeReply("No such this process is in progress", 404));
- return;
- }
-
- ProcessInfo& process = m_processes[processId];
-
- RepoCommandResponse& response = process.response;
-
- //Check whether it is single data fetching
- if (!response.hasStartBlockId() &&
- !response.hasEndBlockId()) {
- //reply(interest, response);
- done(response);
- return;
- }
-
- //read if noEndtimeout
- if (!response.hasEndBlockId()) {
- extendNoEndTime(process);
- done(response);
- return;
- }
- else {
- done(response);
- }
-}
-
-void
-WriteHandle::deferredDeleteProcess(ProcessId processId)
-{
- scheduler.scheduleEvent(PROCESS_DELETE_TIME,
- bind(&WriteHandle::deleteProcess, this, processId));
-}
-
-void
WriteHandle::processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter,
const ndn::mgmt::CommandContinuation& done)
{
@@ -409,16 +127,108 @@
done(response);
response.setCode(300);
-
Interest fetchInterest(parameter.getName());
+ fetchInterest.setCanBePrefix(m_canBePrefix);
fetchInterest.setInterestLifetime(m_interestLifetime);
- if (parameter.hasSelectors()) {
- fetchInterest.setSelectors(parameter.getSelectors());
- }
face.expressInterest(fetchInterest,
- bind(&WriteHandle::onData, this, _1, _2, processId),
- bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
- bind(&WriteHandle::onTimeout, this, _1, processId));
+ std::bind(&WriteHandle::onData, this, _1, _2, processId),
+ std::bind(&WriteHandle::onTimeout, this, _1, processId), // Nack
+ std::bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+ // use SegmentFetcher to send fetch interest.
+ ProcessInfo& process = m_processes[processId];
+ Name name = parameter.getName();
+ SegmentNo startBlockId = parameter.getStartBlockId();
+
+ uint64_t initialCredit = m_credit;
+
+ if (parameter.hasEndBlockId()) {
+ initialCredit =
+ std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId() + 1);
+ }
+ else {
+ // set noEndTimeout timer
+ process.noEndTime = ndn::time::steady_clock::now() +
+ m_noEndTimeout;
+ }
+
+ Name fetchName = name;
+ SegmentNo segment = startBlockId;
+ fetchName.appendSegment(segment);
+ Interest interest(fetchName);
+
+ ndn::util::SegmentFetcher::Options options;
+ options.initCwnd = initialCredit;
+ options.interestLifetime = m_interestLifetime;
+ options.maxTimeout = m_maxTimeout;
+ auto fetcher = ndn::util::SegmentFetcher::start(face, interest, m_validator, options);
+ fetcher->onError.connect([] (uint32_t errorCode, const std::string& errorMsg)
+ {NDN_LOG_ERROR("Error: " << errorMsg);});
+ fetcher->afterSegmentValidated.connect([this, &fetcher, &processId] (const Data& data)
+ {onSegmentData(*fetcher, data, processId);});
+ fetcher->afterSegmentTimedOut.connect([this, &fetcher, &processId] ()
+ {onSegmentTimeout(*fetcher, processId);});
+}
+
+void
+WriteHandle::onSegmentData(ndn::util::SegmentFetcher& fetcher, const Data& data, ProcessId processId)
+{
+ auto it = m_processes.find(processId);
+ if (it == m_processes.end()) {
+ fetcher.stop();
+ return;
+ }
+
+ RepoCommandResponse& response = it->second.response;
+
+ //insert data
+ if (storageHandle.insertData(data)) {
+ response.setInsertNum(response.getInsertNum() + 1);
+ }
+
+ ProcessInfo& process = m_processes[processId];
+ //read whether notime timeout
+ if (!response.hasEndBlockId()) {
+
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+ if (now > noEndTime) {
+ NDN_LOG_DEBUG("noEndtimeout: " << processId);
+ //StatusCode should be refreshed as 405
+ response.setCode(405);
+ //schedule a delete event
+ deferredDeleteProcess(processId);
+ fetcher.stop();
+ return;
+ }
+ }
+
+ //read whether this process has total ends, if ends, remove control info from the maps
+ if (response.hasEndBlockId()) {
+ uint64_t nSegments = response.getEndBlockId() - response.getStartBlockId() + 1;
+ if (response.getInsertNum() >= nSegments) {
+ //All the data has been inserted, StatusCode is refreshed as 200
+ response.setCode(200);
+ deferredDeleteProcess(processId);
+ fetcher.stop();
+ return;
+ }
+ }
+}
+
+void
+WriteHandle::onSegmentTimeout(ndn::util::SegmentFetcher& fetcher, ProcessId processId)
+{
+ NDN_LOG_DEBUG("SegTimeout");
+ if (m_processes.count(processId) == 0) {
+ fetcher.stop();
+ return;
+ }
}
void
@@ -470,6 +280,49 @@
}
void
+WriteHandle::handleCheckCommand(const Name& prefix, const Interest& interest,
+ const ndn::mgmt::ControlParameters& parameter,
+ const ndn::mgmt::CommandContinuation& done)
+{
+ const RepoCommandParameter& repoParameter = dynamic_cast<const RepoCommandParameter&>(parameter);
+
+ //check whether this process exists
+ ProcessId processId = repoParameter.getProcessId();
+ if (m_processes.count(processId) == 0) {
+ NDN_LOG_DEBUG("no such processId: " << processId);
+ done(negativeReply("No such this process is in progress", 404));
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+
+ //Check whether it is single data fetching
+ if (!response.hasStartBlockId() && !response.hasEndBlockId()) {
+ done(response);
+ return;
+ }
+
+ //read if noEndtimeout
+ if (!response.hasEndBlockId()) {
+ extendNoEndTime(process);
+ done(response);
+ return;
+ }
+ else {
+ done(response);
+ }
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+ scheduler.scheduleEvent(PROCESS_DELETE_TIME,
+ std::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
WriteHandle::extendNoEndTime(ProcessInfo& process)
{
ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;