ndn-handle: insert, insert status check command
Also, BaseHandle takes reference instead of pointer
Change-Id: Ife53fcebe52c99252e418a46d7361bae8e638bdf
diff --git a/ndn-handle/base-handle.cpp b/ndn-handle/base-handle.cpp
new file mode 100644
index 0000000..b932ebd
--- /dev/null
+++ b/ndn-handle/base-handle.cpp
@@ -0,0 +1,19 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2013 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "base-handle.hpp"
+
+namespace repo {
+
+uint64_t
+BaseHandle::generateProcessId()
+{
+ static boost::random::mt19937_64 gen;
+ static boost::random::uniform_int_distribution<uint64_t> dist(0, 0xFFFFFFFFFFFFFFFFLL);
+ return dist(gen);
+}
+
+}
\ No newline at end of file
diff --git a/ndn-handle/base-handle.hpp b/ndn-handle/base-handle.hpp
index 97df137..05a2070 100644
--- a/ndn-handle/base-handle.hpp
+++ b/ndn-handle/base-handle.hpp
@@ -15,32 +15,89 @@
{
public:
- BaseHandle(Face* face, StorageHandle* storageHandle)
+ class Error : std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
+public:
+ BaseHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain, Scheduler& scheduler)
: m_face(face)
, m_storageHandle(storageHandle)
+ , m_keyChain(keyChain)
+ , m_scheduler(scheduler)
{
}
virtual void
listen(const Name& prefix) = 0;
- inline Face*
+protected:
+
+ inline Face&
getFace()
{
return m_face;
}
- inline StorageHandle*
+ inline StorageHandle&
getStorageHandle()
{
return m_storageHandle;
}
+ inline Scheduler&
+ getScheduler()
+ {
+ return m_scheduler;
+ }
+
+ uint64_t
+ generateProcessId();
+
+ void
+ reply(const Interest& commandInterest, const RepoCommandResponse& response);
+
+
+ /**
+ * @brief extract RepoCommandParameter from a command Interest.
+ * @param interest command Interest
+ * @param prefix Name prefix up to command-verb
+ * @param[out] parameter parsed parameter
+ * @throw RepoCommandParameter::Error parse error
+ */
+ void
+ extractParameter(const Interest& interest, const Name& prefix, RepoCommandParameter& parameter);
+
private:
- Face* m_face;
- StorageHandle* m_storageHandle;
+
+ Face& m_face;
+ StorageHandle& m_storageHandle;
+ KeyChain& m_keyChain;
+ Scheduler& m_scheduler;
};
+inline void
+BaseHandle::reply(const Interest& commandInterest, const RepoCommandResponse& response)
+{
+ Data rdata(commandInterest.getName());
+ rdata.setContent(response.wireEncode());
+ m_keyChain.sign(rdata);
+ m_face.put(rdata);
+}
+
+inline void
+BaseHandle::extractParameter(const Interest& interest, const Name& prefix,
+ RepoCommandParameter& parameter)
+{
+ parameter.wireDecode(interest.getName().get(prefix.size()).blockFromValue());
+}
+
} //namespace repo
#endif // REPO_NDN_HANDLE_BASE_HANDLE_HPP
diff --git a/ndn-handle/ndn-handle-common.hpp b/ndn-handle/ndn-handle-common.hpp
index c761805..73634b3 100644
--- a/ndn-handle/ndn-handle-common.hpp
+++ b/ndn-handle/ndn-handle-common.hpp
@@ -20,13 +20,12 @@
#include <ndn-cpp-dev/face.hpp>
#include <ndn-cpp-dev/security/key-chain.hpp>
#include <ndn-cpp-dev/util/command-interest-validator.hpp>
+#include <ndn-cpp-dev/util/time.hpp>
+#include <ndn-cpp-dev/util/scheduler.hpp>
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_int_distribution.hpp>
#include <map>
-
-#define RETRY_TIMEOUT 3
-#define DEFAULT_CREDIT 12
-#define NOEND_TIMEOUT 10000
+#include <algorithm>
namespace repo {
@@ -37,9 +36,13 @@
using ndn::Selectors;
using ndn::bind;
using ndn::CommandInterestValidator;
+using ndn::Scheduler;
using boost::shared_ptr;
+typedef uint64_t ProcessId;
+typedef uint64_t SegmentNo;
+
}
#endif // REPO_NDN_HANDLE_NDN_HANDLE_COMMON_HPP
diff --git a/ndn-handle/read-handle.cpp b/ndn-handle/read-handle.cpp
index 9ab7822..d9ddb01 100644
--- a/ndn-handle/read-handle.cpp
+++ b/ndn-handle/read-handle.cpp
@@ -11,8 +11,8 @@
ReadHandle::onInterest(const Name& prefix, const Interest& interest)
{
Data data;
- if (getStorageHandle()->readData(interest, data)) {
- getFace()->put(data);
+ if (getStorageHandle().readData(interest, data)) {
+ getFace().put(data);
}
}
@@ -20,15 +20,15 @@
ReadHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
{
std::cerr << "ERROR: Failed to register prefix in local hub's daemon" << std::endl;
- getFace()->shutdown();
+ getFace().shutdown();
}
void
ReadHandle::listen(const Name& prefix)
{
- getFace()->setInterestFilter(prefix,
- bind(&ReadHandle::onInterest, this, _1, _2),
- bind(&ReadHandle::onRegisterFailed, this, _1, _2));
+ getFace().setInterestFilter(prefix,
+ bind(&ReadHandle::onInterest, this, _1, _2),
+ bind(&ReadHandle::onRegisterFailed, this, _1, _2));
}
} //namespace repo
diff --git a/ndn-handle/read-handle.hpp b/ndn-handle/read-handle.hpp
index 5af1fb3..1f5af86 100644
--- a/ndn-handle/read-handle.hpp
+++ b/ndn-handle/read-handle.hpp
@@ -11,12 +11,12 @@
namespace repo {
-class ReadHandle : BaseHandle
+class ReadHandle : public BaseHandle
{
public:
- ReadHandle(Face* face, StorageHandle* storageHandle)
- : BaseHandle(face, storageHandle)
+ ReadHandle(Face& face, StorageHandle& storageHandle, KeyChain keyChain, Scheduler& scheduler)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
{
}
@@ -25,7 +25,7 @@
private:
/**
- * @brief Read the name from backend storage
+ * @brief Read data from backend storage
*/
void
onInterest(const Name& prefix, const Interest& interest);
diff --git a/ndn-handle/write-handle.cpp b/ndn-handle/write-handle.cpp
new file mode 100644
index 0000000..dac5ca8
--- /dev/null
+++ b/ndn-handle/write-handle.cpp
@@ -0,0 +1,534 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2013 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "write-handle.hpp"
+
+namespace repo {
+
+static const int RETRY_TIMEOUT = 3;
+static const int DEFAULT_CREDIT = 12;
+static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
+static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
+
+WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator)
+ : BaseHandle(face, storageHandle, keyChain, scheduler)
+ , m_validator(validator)
+ , m_retryTime(RETRY_TIMEOUT)
+ , m_credit(DEFAULT_CREDIT)
+ , m_noEndTimeout(NOEND_TIMEOUT)
+{
+}
+
+void
+WriteHandle::deleteProcess(ProcessId processId)
+{
+ m_processes.erase(processId);
+}
+
+// Interest.
+void
+WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onValidated, this, _1, prefix),
+ bind(&WriteHandle::onValidationFailed, this, _1));
+}
+
+// onRegisterFailed.
+void
+WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert prefix registration failed");
+}
+
+// onRegisterFailed for insert.
+void
+WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
+{
+ throw Error("Insert check prefix registration failed");
+}
+
+void
+WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ //m_validResult = 1;
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
+ if (parameter.hasSelectors()) {
+ negativeReply(*interest, 402);
+ return;
+ }
+ processSegmentedInsertCommand(*interest, parameter);
+ }
+ else {
+ processSingleInsertCommand(*interest, parameter);
+ }
+
+}
+
+void
+WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ std::cout << "invalidated" << std::endl;
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
+{
+ //std::cout << "onData" << std::endl;
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+
+ if (response.getInsertNum() == 0) {
+ getStorageHandle().insertData(data);
+ response.setInsertNum(1);
+ }
+
+ deferredDeleteProcess(processId);
+}
+
+void
+WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
+{
+ //std::cout << "I: " << interest.toUri() << std::endl;
+ //std::cout << "D: " << data.getName().toUri() << std::endl;
+ //retrieve the process from the responsemap
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ RepoCommandResponse& response = m_processes[processId].response;
+
+ //refresh endBlockId
+ Name::Component finalBlockId = data.getFinalBlockId();
+
+ if (!finalBlockId.empty()) {
+ SegmentNo final = finalBlockId.toSegment();
+ if (response.hasEndBlockId()) {
+ if (final < response.getEndBlockId()) {
+ response.setEndBlockId(final);
+ }
+ }
+ else {
+ response.setEndBlockId(final);
+ }
+ }
+
+ //insert data
+ //std::cout << "start to insert" << std::endl;
+ if (getStorageHandle().insertData(data)) {
+ response.setInsertNum(response.getInsertNum() + 1);
+ }
+ //std::cout << "end of insert" << std::endl;
+
+ //it->second = response;
+
+ onSegmentDataControl(processId, interest);
+}
+
+void
+WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
+{
+ std::cout << "Timeout" << std::endl;
+ m_processes.erase(processId);
+}
+
+void
+WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
+{
+ std::cout << "SegTimeout" << std::endl;
+
+ onSegmentTimeoutControl(processId, interest);
+}
+
+void
+WriteHandle::listen(const Name& prefix)
+{
+ Name insertPrefix;
+ insertPrefix.append(prefix).append("insert");
+ getFace().setInterestFilter(insertPrefix,
+ bind(&WriteHandle::onInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+ Name insertCheckPrefix;
+ insertCheckPrefix.append(prefix).append("insert check");
+ getFace().setInterestFilter(insertCheckPrefix,
+ bind(&WriteHandle::onCheckInterest, this, _1, _2),
+ bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+ ProcessInfo& process = m_processes[processId];
+ process.credit = 0;
+
+ map<SegmentNo, int>& processRetry = process.retryCounts;
+
+ Name name = parameter.getName();
+ SegmentNo startBlockId = parameter.getStartBlockId();
+
+ uint64_t initialCredit = m_credit;
+
+ if (parameter.hasEndBlockId()) {
+ initialCredit =
+ std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId());
+ }
+ else {
+ // set noEndTimeout timer
+ process.noEndTime = ndn::time::steady_clock::now() +
+ m_noEndTimeout;
+ }
+ process.credit = initialCredit;
+ SegmentNo segment = startBlockId;
+ for (; segment < startBlockId + initialCredit; ++segment) {
+ Name fetchName = name;
+ fetchName.appendSegment(segment);
+ Interest interest(fetchName);
+ //std::cout << "seg:" << j<<std::endl;
+ getFace().expressInterest(interest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ process.credit--;
+ processRetry[segment] = 0;
+ }
+
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+
+ segment++;
+ process.nextSegment = segment;
+ nextSegmentQueue.push(segment);
+}
+
+void
+WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
+{
+ //std::cout << "onSegmentDataControl: " << processId << std::endl;
+
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+ int& processCredit = process.credit;
+ //onSegmentDataControl is called when a data returns.
+ //When data returns, processCredit++
+ processCredit++;
+ SegmentNo& nextSegment = process.nextSegment;
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ //read whether notime timeout
+ if (!response.hasEndBlockId()) {
+
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+ if (now > noEndTime) {
+ std::cout << "noEndtimeout: " << processId << std::endl;
+ //m_processes.erase(processId);
+ //StatusCode should be refreshed as 405
+ response.setStatusCode(405);
+ //schedule a delete event
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //read whether this process has total ends, if ends, remove control info from the maps
+ if (response.hasEndBlockId()) {
+ uint64_t nSegments =
+ response.getEndBlockId() - response.getStartBlockId() + 1;
+ if (response.getInsertNum() >= nSegments) {
+ //m_processes.erase(processId);
+ //All the data has been inserted, StatusCode is refreshed as 200
+ response.setStatusCode(200);
+ deferredDeleteProcess(processId);
+ return;
+ }
+ }
+
+ //check whether there is any credit
+ if (processCredit == 0)
+ return;
+
+
+ //check whether sent queue empty
+ if (nextSegmentQueue.empty()) {
+ //do not do anything
+ return;
+ }
+
+ //pop the queue
+ SegmentNo sendingSegment = nextSegmentQueue.front();
+ nextSegmentQueue.pop();
+
+ //check whether sendingSegment exceeds
+ if (sendingSegment > response.getEndBlockId()) {
+ //do not do anything
+ return;
+ }
+
+ //read whether this is retransmitted data;
+ SegmentNo fetchedSegment =
+ interest.getName().get(interest.getName().size() - 1).toSegment();
+
+ BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
+
+ //find this fetched data, remove it from this map
+ //rit->second.erase(oit);
+ retryCounts.erase(fetchedSegment);
+ //express the interest of the top of the queue
+ Name fetchName(interest.getName().getPrefix(-1));
+ fetchName.appendSegment(sendingSegment);
+ Interest fetchInterest(fetchName);
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ //When an interest is expressed, processCredit--
+ processCredit--;
+ //std::cout << "sent seg: " << sendingSegment << std::endl;
+ if (retryCounts.count(sendingSegment) == 0) {
+ //not found
+ retryCounts[sendingSegment] = 0;
+ }
+ else {
+ //found
+ retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
+ }
+ //increase the next seg and put it into the queue
+ if ((nextSegment + 1) <= response.getEndBlockId()) {
+ nextSegment++;
+ nextSegmentQueue.push(nextSegment);
+ }
+}
+
+void
+WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
+{
+ if (m_processes.count(processId) == 0) {
+ return;
+ }
+ ProcessInfo& process = m_processes[processId];
+ RepoCommandResponse& response = process.response;
+ SegmentNo& nextSegment = process.nextSegment;
+ queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+ map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+ SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
+
+ std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
+
+ BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
+
+ //read the retry time. If retry out of time, fail the process. if not, plus
+ int& retryTime = retryCounts[timeoutSegment];
+ if (retryTime >= m_retryTime) {
+ //fail this process
+ std::cout << "Retry timeout: " << processId << std::endl;
+ m_processes.erase(processId);
+ return;
+ }
+ else {
+ //Reput it in the queue, retryTime++
+ retryTime++;
+ Interest retryInterest(interest.getName());
+ getFace().expressInterest(retryInterest,
+ bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+ bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+ }
+
+}
+
+void
+WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+{
+ m_validator.validate(interest,
+ bind(&WriteHandle::onCheckValidated, this, _1, prefix),
+ bind(&WriteHandle::onCheckValidationFailed, this, _1));
+
+}
+
+void
+WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+ RepoCommandParameter parameter;
+ try {
+ extractParameter(*interest, prefix, parameter);
+ }
+ catch (RepoCommandParameter::Error) {
+ negativeReply(*interest, 403);
+ return;
+ }
+
+ if (!parameter.hasProcessId()) {
+ negativeReply(*interest, 403);
+ return;
+ }
+ //check whether this process exists
+ ProcessId processId = parameter.getProcessId();
+ if (m_processes.count(processId) == 0) {
+ std::cout << "no such processId: " << processId << std::endl;
+ negativeReply(*interest, 404);
+ return;
+ }
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+
+ //Check whether it is single data fetching
+ if (!response.hasStartBlockId() &&
+ !response.hasEndBlockId()) {
+ reply(*interest, response);
+ return;
+ }
+
+ //read if noEndtimeout
+ if (!response.hasEndBlockId()) {
+ extendNoEndTime(process);
+ reply(*interest, response);
+ return;
+ }
+ else {
+ reply(*interest, response);
+ }
+}
+
+void
+WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
+{
+ negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+ getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+ ndn::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
+WriteHandle::processSingleInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ ProcessId processId = generateProcessId();
+
+ ProcessInfo& process = m_processes[processId];
+
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+
+ reply(interest, response);
+
+ response.setStatusCode(300);
+
+ Interest fetchInterest(parameter.getName());
+ if (parameter.hasSelectors()) {
+ fetchInterest.setSelectors(parameter.getSelectors());
+ }
+ getFace().expressInterest(fetchInterest,
+ bind(&WriteHandle::onData, this, _1, _2, processId),
+ bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::processSegmentedInsertCommand(const Interest& interest,
+ RepoCommandParameter& parameter)
+{
+ if (parameter.hasEndBlockId()) {
+ //normal fetch segment
+ if (!parameter.hasStartBlockId()) {
+ parameter.setStartBlockId(0);
+ }
+
+ SegmentNo startBlockId = parameter.getStartBlockId();
+ SegmentNo endBlockId = parameter.getEndBlockId();
+ //std::cout << "startBlockId: " << startBlockId << std::endl;
+ //std::cout << "endBlockId: " << endBlockId << std::endl;
+ if (startBlockId > endBlockId) {
+ negativeReply(interest, 403);
+ return;
+ }
+
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(startBlockId);
+ response.setEndBlockId(endBlockId);
+
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+ else {
+ //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
+ ProcessId processId = generateProcessId();
+ ProcessInfo& process = m_processes[processId];
+ //std::cout << "processId: " << processId << std::endl;
+ RepoCommandResponse& response = process.response;
+ response.setStatusCode(100);
+ response.setProcessId(processId);
+ response.setInsertNum(0);
+ response.setStartBlockId(parameter.getStartBlockId());
+ reply(interest, response);
+
+ //300 means data fetching is in progress
+ response.setStatusCode(300);
+
+ segInit(processId, parameter);
+ }
+}
+
+void
+WriteHandle::extendNoEndTime(ProcessInfo& process)
+{
+ ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+ ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+ RepoCommandResponse& response = process.response;
+ if (now > noEndTime) {
+ response.setStatusCode(405);
+ return;
+ }
+ //extends noEndTime
+ process.noEndTime =
+ ndn::time::steady_clock::now() + m_noEndTimeout;
+
+}
+
+void
+WriteHandle::negativeReply(const Interest& interest, int statusCode)
+{
+ RepoCommandResponse response;
+ response.setStatusCode(statusCode);
+ reply(interest, response);
+}
+
+} //namespace repo
diff --git a/ndn-handle/write-handle.hpp b/ndn-handle/write-handle.hpp
new file mode 100644
index 0000000..0c5ee66
--- /dev/null
+++ b/ndn-handle/write-handle.hpp
@@ -0,0 +1,212 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2014 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#ifndef REPO_NDN_HANDLE_WRITE_HANDLE_HPP
+#define REPO_NDN_HANDLE_WRITE_HANDLE_HPP
+
+#include "ndn-handle-common.hpp"
+#include "base-handle.hpp"
+#include <queue>
+
+namespace repo {
+
+using std::map;
+using std::pair;
+using std::queue;
+
+/**
+* @brief WriteHandle provides basic credit based congestion control.
+*
+* First repo sends interests of credit number and then credit will be 0.
+*
+* If a data comes, credit++ and sends a interest then credit--.
+*
+* If the interest timeout, repo will retry and send interest in retrytimes.
+*
+* If one interest timeout beyond retrytimes, the fetching process will terminate.
+*
+* Another case is that if command will insert segmented data without EndBlockId.
+*
+* The repo will keep fetching data in noendTimeout time.
+*
+* If data returns with FinalBlockId, this detecting timeout process will terminate.
+*
+* If client sends a insert check command, the noendTimeout timer will be set to 0.
+*
+* If repo cannot get FinalBlockId in noendTimeout time, the fetching process will terminate.
+*/
+class WriteHandle : public BaseHandle
+{
+
+public:
+ class Error : public BaseHandle::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : BaseHandle::Error(what)
+ {
+ }
+ };
+
+
+public:
+ WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+ Scheduler& scheduler, CommandInterestValidator& validator);
+
+ virtual void
+ listen(const Name& prefix);
+
+private:
+ /**
+ * @brief Information of insert process including variables for response
+ * and credit based flow control
+ */
+ struct ProcessInfo
+ {
+ //ProcessId id;
+ RepoCommandResponse response;
+ queue<SegmentNo> nextSegmentQueue; ///< queue of waiting segment
+ /// to be sent when having credits
+ SegmentNo nextSegment; ///< last segment put into the nextSegmentQueue
+ map<SegmentNo, int> retryCounts; ///< to store retrying times of timeout segment
+ int credit; ///< congestion control credits of process
+
+ /**
+ * @brief the latest time point at which EndBlockId must be determined
+ *
+ * Segmented fetch process will terminate if EndBlockId cannot be
+ * determined before this time point.
+ * It is initialized to now()+noEndTimeout when segmented fetch process begins,
+ * and reset to now()+noEndTimeout each time an insert status check command is processed.
+ */
+ ndn::time::steady_clock::TimePoint noEndTime;
+ };
+
+private: // insert command
+ /**
+ * @brief handle insert commands
+ */
+ void
+ onInterest(const Name& prefix, const Interest& interest);
+
+ void
+ onValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ void
+ onValidationFailed(const shared_ptr<const Interest>& interest);
+
+ /**
+ * @brief insert command prefix register failed
+ */
+ void
+ onRegisterFailed(const Name& prefix, const std::string& reason);
+
+private: // single data fetching
+ /**
+ * @brief fetch one data
+ */
+ void
+ onData(const Interest& interest, Data& data, ProcessId processId);
+
+ /**
+ * @brief handle when fetching one data timeout
+ */
+ void
+ onTimeout(const Interest& interest, ProcessId processId);
+
+ void
+ processSingleInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+private: // segmented data fetching
+ /**
+ * @brief fetch segmented data
+ */
+ void
+ onSegmentData(const Interest& interest, Data& data, ProcessId processId);
+
+ /**
+ * @brief Timeout when fetching segmented data. Data can be fetched RETRY_TIMEOUT times.
+ */
+ void
+ onSegmentTimeout(const Interest& interest, ProcessId processId);
+
+ /**
+ * @brief initiate fetching segmented data
+ */
+ void
+ segInit(ProcessId processId, const RepoCommandParameter& parameter);
+
+ /**
+ * @brief control for sending interests in function onSegmentData()
+ */
+ void
+ onSegmentDataControl(ProcessId processId, const Interest& interest);
+
+ /**
+ * @brief control for sending interest in function onSegmentTimeout
+ */
+ void
+ onSegmentTimeoutControl(ProcessId processId, const Interest& interest);
+
+ void
+ processSegmentedInsertCommand(const Interest& interest, RepoCommandParameter& parameter);
+
+ /**
+ * @brief extends noEndTime of process if not noEndTimeout, set StatusCode 405
+ *
+ * called by onCheckValidated() if there is no EndBlockId. If not noEndTimeout,
+ * extends noEndTime of process. If noEndTimeout, set status code 405 as noEndTimeout.
+ */
+ void
+ extendNoEndTime(ProcessInfo& process);
+
+private: // insert state check command
+ /**
+ * @brief handle insert check command
+ */
+ void
+ onCheckInterest(const Name& prefix, const Interest& interest);
+
+ /**
+ * @brief insert check command prefix register failed
+ */
+ void
+ onCheckRegisterFailed(const Name& prefix, const std::string& reason);
+
+ void
+ onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix);
+
+ void
+ onCheckValidationFailed(const shared_ptr<const Interest>& interest);
+
+private:
+ void
+ deleteProcess(ProcessId processId);
+
+ /**
+ * @brief schedule a event to delete the process
+ */
+ void
+ deferredDeleteProcess(ProcessId processId);
+
+ void
+ negativeReply(const Interest& interest, int statusCode);
+
+private:
+
+ CommandInterestValidator& m_validator;
+
+ map<ProcessId, ProcessInfo> m_processes;
+
+ int m_retryTime;
+ int m_credit;
+ ndn::time::milliseconds m_noEndTimeout;
+};
+
+} // namespace repo
+
+#endif // REPO_NDN_HANDLE_WRITE_HANDLE_HPP
diff --git a/server/server.cpp b/server/server.cpp
index 6e4ee85..7e43180 100644
--- a/server/server.cpp
+++ b/server/server.cpp
@@ -7,10 +7,12 @@
#include <string>
#include <iostream>
#include <ndn-cpp-dev/face.hpp>
+#include <ndn-cpp-dev/util/command-interest-validator.hpp>
#include "../storage/storage-handle.hpp"
#include "../storage/sqlite/sqlite-handle.hpp"
#include "../ndn-handle/read-handle.hpp"
+#include "../ndn-handle/write-handle.hpp"
using namespace repo;
@@ -41,14 +43,32 @@
break;
}
}
- SqliteHandle sqliteHandle(dbPath);
- StorageHandle* handle = &sqliteHandle;
- Face face;
- ReadHandle readHandle(&face, handle);
if (confPath.empty()) {
confPath = "./repo.conf";
}
+
+ Name dataPrefix("ndn:/example/data");
+ Name repoPrefix("ndn:/example/repo");
+ /// @todo read from configuration
+
+ SqliteHandle sqliteHandle(dbPath);
+
+ shared_ptr<boost::asio::io_service> io =
+ ndn::make_shared<boost::asio::io_service>();
+
+ Face face(io);
+ Scheduler scheduler(*io);
+
+ /// @todo specify trust model
+ CommandInterestValidator validator;
+ KeyChain keyChain;
+
+ ReadHandle readHandle(face, sqliteHandle, keyChain, scheduler);
+ readHandle.listen(dataPrefix);
+ WriteHandle writeHandle(face, sqliteHandle, keyChain, scheduler, validator);
+ writeHandle.listen(repoPrefix);
+
face.processEvents();
return 0;
}