ndn-handle: insert, insert status check command
Also, BaseHandle takes reference instead of pointer
Change-Id: Ife53fcebe52c99252e418a46d7361bae8e638bdf
diff --git a/ndn-handle/write-handle.cpp b/ndn-handle/write-handle.cpp
new file mode 100644
index 0000000..dac5ca8
--- /dev/null
+++ b/ndn-handle/write-handle.cpp
@@ -0,0 +1,534 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2013 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "write-handle.hpp"
+
+namespace repo {
+
+static const int RETRY_TIMEOUT = 3;
+static const int DEFAULT_CREDIT = 12;
+static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
+static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
+
+WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ , m_validator(validator)
+ , m_retryTime(RETRY_TIMEOUT)
+ , m_credit(DEFAULT_CREDIT)
+ , m_noEndTimeout(NOEND_TIMEOUT)
+{
+}
+
+void
+WriteHandle::deleteProcess(ProcessId processId)
+{
+ m_processes.erase(processId);
+}
+
+// Interest.
+void
+WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onValidated, this, _1, prefix),
+ bind(&WriteHandle::onValidationFailed, this, _1));
+}
+
+// onRegisterFailed.
+void
+WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert prefix registration failed");
+}
+
+// onRegisterFailed for insert.
+void
+WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert check prefix registration failed");
+}
+
+void
+WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ //m_validResult = 1;
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
+ if (parameter.hasSelectors()) {
+ negativeReply(*interest, 402);
+ return;
+ }
+ processSegmentedInsertCommand(*interest, parameter);
+ }
+ else {
+ processSingleInsertCommand(*interest, parameter);
+ }
+
+}
+
+void
+WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ std::cout << "invalidated" << std::endl;
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
+{
+ //std::cout << "onData" << std::endl;
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+
+ if (response.getInsertNum() == 0) {
+ getStorageHandle().insertData(data);
+ response.setInsertNum(1);
+ }
+
+ deferredDeleteProcess(processId);
+}
+
+void
+WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
+{
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ //retrieve the process from the responsemap
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ RepoCommandResponse& response = m_processes[processId].response;
+
+ //refresh endBlockId
+ Name::Component finalBlockId = data.getFinalBlockId();
+
+ if (!finalBlockId.empty()) {
+ SegmentNo final = finalBlockId.toSegment();
+ if (response.hasEndBlockId()) {
+ if (final < response.getEndBlockId()) {
+ response.setEndBlockId(final);
+ }
+ }
+ else {
+ response.setEndBlockId(final);
+ }
+ }
+
+ //insert data
+ //std::cout << "start to insert" << std::endl;
+ if (getStorageHandle().insertData(data)) {
+ response.setInsertNum(response.getInsertNum() + 1);
+ }
+ //std::cout << "end of insert" << std::endl;
+
+ //it->second = response;
+
+ onSegmentDataControl(processId, interest);
+}
+
+void
+WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
+{
+ std::cout << "Timeout" << std::endl;
+ m_processes.erase(processId);
+}
+
+void
+WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
+{
+ std::cout << "SegTimeout" << std::endl;
+
+ onSegmentTimeoutControl(processId, interest);
+}
+
+void
+WriteHandle::listen(const Name& prefix)
+{
+ Name insertPrefix;
+ insertPrefix.append(prefix).append("insert");
+ getFace().setInterestFilter(insertPrefix,
+ bind(&WriteHandle::onInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+ Name insertCheckPrefix;
+ insertCheckPrefix.append(prefix).append("insert check");
+ getFace().setInterestFilter(insertCheckPrefix,
+ bind(&WriteHandle::onCheckInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+ ProcessInfo& process = m_processes[processId];
+ process.credit = 0;
+
+ map<SegmentNo, int>& processRetry = process.retryCounts;
+
+ Name name = parameter.getName();
+ SegmentNo startBlockId = parameter.getStartBlockId();
+
+ uint64_t initialCredit = m_credit;
+
+ if (parameter.hasEndBlockId()) {
+ initialCredit =
+ std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId());
+ }
+ else {
+ // set noEndTimeout timer
+ process.noEndTime = ndn::time::steady_clock::now() +
+ m_noEndTimeout;
+ }
+ process.credit = initialCredit;
+ SegmentNo segment = startBlockId;
+ for (; segment < startBlockId + initialCredit; ++segment) {
+ Name fetchName = name;
+ fetchName.appendSegment(segment);
+ Interest interest(fetchName);
+ //std::cout << "seg:" << j<<std::endl;
+ getFace().expressInterest(interest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ process.credit--;
+ processRetry[segment] = 0;
+ }
+
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+
+ segment++;
+ process.nextSegment = segment;
+ nextSegmentQueue.push(segment);
+}
+
+void
+WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
+{
+ //std::cout << "onSegmentDataControl: " << processId << std::endl;
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+ int& processCredit = process.credit;
+ //onSegmentDataControl is called when a data returns.
+ //When data returns, processCredit++
+ processCredit++;
+ SegmentNo& nextSegment = process.nextSegment;
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ //read whether notime timeout
+ if (!response.hasEndBlockId()) {
+
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+ if (now > noEndTime) {
+ std::cout << "noEndtimeout: " << processId << std::endl;
+ //m_processes.erase(processId);
+ //StatusCode should be refreshed as 405
+ response.setStatusCode(405);
+ //schedule a delete event
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //read whether this process has total ends, if ends, remove control info from the maps
+ if (response.hasEndBlockId()) {
+ uint64_t nSegments =
+ response.getEndBlockId() - response.getStartBlockId() + 1;
+ if (response.getInsertNum() >= nSegments) {
+ //m_processes.erase(processId);
+ //All the data has been inserted, StatusCode is refreshed as 200
+ response.setStatusCode(200);
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //check whether there is any credit
+ if (processCredit == 0)
+ return;
+
+
+ //check whether sent queue empty
+ if (nextSegmentQueue.empty()) {
+ //do not do anything
+ return;
+ }
+
+ //pop the queue
+ SegmentNo sendingSegment = nextSegmentQueue.front();
+ nextSegmentQueue.pop();
+
+ //check whether sendingSegment exceeds
+ if (sendingSegment > response.getEndBlockId()) {
+ //do not do anything
+ return;
+ }
+
+ //read whether this is retransmitted data;
+ SegmentNo fetchedSegment =
+ interest.getName().get(interest.getName().size() - 1).toSegment();
+
+ BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
+
+ //find this fetched data, remove it from this map
+ //rit->second.erase(oit);
+ retryCounts.erase(fetchedSegment);
+ //express the interest of the top of the queue
+ Name fetchName(interest.getName().getPrefix(-1));
+ fetchName.appendSegment(sendingSegment);
+ Interest fetchInterest(fetchName);
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ //When an interest is expressed, processCredit--
+ processCredit--;
+ //std::cout << "sent seg: " << sendingSegment << std::endl;
+ if (retryCounts.count(sendingSegment) == 0) {
+ //not found
+ retryCounts[sendingSegment] = 0;
+ }
+ else {
+ //found
+ retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
+ }
+ //increase the next seg and put it into the queue
+ if ((nextSegment + 1) <= response.getEndBlockId()) {
+ nextSegment++;
+ nextSegmentQueue.push(nextSegment);
+ }
+}
+
+void
+WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
+{
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+ SegmentNo& nextSegment = process.nextSegment;
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
+
+ std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
+
+ BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
+
+ //read the retry time. If retry out of time, fail the process. if not, plus
+ int& retryTime = retryCounts[timeoutSegment];
+ if (retryTime >= m_retryTime) {
+ //fail this process
+ std::cout << "Retry timeout: " << processId << std::endl;
+ m_processes.erase(processId);
+ return;
+ }
+ else {
+ //Reput it in the queue, retryTime++
+ retryTime++;
+ Interest retryInterest(interest.getName());
+ getFace().expressInterest(retryInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ }
+
+}
+
+void
+WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onCheckValidated, this, _1, prefix),
+ bind(&WriteHandle::onCheckValidationFailed, this, _1));
+
+}
+
+void
+WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (!parameter.hasProcessId()) {
+ negativeReply(*interest, 403);
+ return;
+ }
+ //check whether this process exists
+ ProcessId processId = parameter.getProcessId();
+ if (m_processes.count(processId) == 0) {
+ std::cout << "no such processId: " << processId << std::endl;
+ negativeReply(*interest, 404);
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+
+ //Check whether it is single data fetching
+ if (!response.hasStartBlockId() &&
+ !response.hasEndBlockId()) {
+ reply(*interest, response);
+ return;
+ }
+
+ //read if noEndtimeout
+ if (!response.hasEndBlockId()) {
+ extendNoEndTime(process);
+ reply(*interest, response);
+ return;
+ }
+ else {
+ reply(*interest, response);
+ }
+}
+
+void
+WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+ getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ ndn::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
+WriteHandle::processSingleInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ ProcessId processId = generateProcessId();
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+
+ reply(interest, response);
+
+ response.setStatusCode(300);
+
+ Interest fetchInterest(parameter.getName());
+ if (parameter.hasSelectors()) {
+ fetchInterest.setSelectors(parameter.getSelectors());
+ }
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onData, this, _1, _2, processId),
+ bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::processSegmentedInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ if (parameter.hasEndBlockId()) {
+ //normal fetch segment
+ if (!parameter.hasStartBlockId()) {
+ parameter.setStartBlockId(0);
+ }
+
+ SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo endBlockId = parameter.getEndBlockId();
+ //std::cout << "startBlockId: " << startBlockId << std::endl;
+ //std::cout << "endBlockId: " << endBlockId << std::endl;
+ if (startBlockId > endBlockId) {
+ negativeReply(interest, 403);
+ return;
+ }
+
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(startBlockId);
+ response.setEndBlockId(endBlockId);
+
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+ else {
+ //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(parameter.getStartBlockId());
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+}
+
+void
+WriteHandle::extendNoEndTime(ProcessInfo& process)
+{
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+ RepoCommandResponse& response = process.response;
+ if (now > noEndTime) {
+ response.setStatusCode(405);
+ return;
+ }
+ //extends noEndTime
+ process.noEndTime =
+ ndn::time::steady_clock::now() + m_noEndTimeout;
+
+}
+
+void
+WriteHandle::negativeReply(const Interest& interest, int statusCode)
+{
+ RepoCommandResponse response;
+ response.setStatusCode(statusCode);
+ reply(interest, response);
+}
+
+} //namespace repo