ndn-handle: insert, insert status check command

Also, BaseHandle takes reference instead of pointer

Change-Id: Ife53fcebe52c99252e418a46d7361bae8e638bdf
diff --git a/ndn-handle/write-handle.cpp b/ndn-handle/write-handle.cpp
new file mode 100644
index 0000000..dac5ca8
--- /dev/null
+++ b/ndn-handle/write-handle.cpp
@@ -0,0 +1,534 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (C) 2013 Regents of the University of California.
+ * See COPYING for copyright and distribution information.
+ */
+
+#include "write-handle.hpp"
+
+namespace repo {
+
+static const int RETRY_TIMEOUT = 3;
+static const int DEFAULT_CREDIT = 12;
+static const ndn::time::milliseconds NOEND_TIMEOUT(10000);
+static const ndn::time::milliseconds PROCESS_DELETE_TIME(10000);
+
+WriteHandle::WriteHandle(Face& face, StorageHandle& storageHandle, KeyChain& keyChain,
+                         Scheduler& scheduler, CommandInterestValidator& validator)
+  : BaseHandle(face, storageHandle, keyChain, scheduler)
+  , m_validator(validator)
+  , m_retryTime(RETRY_TIMEOUT)
+  , m_credit(DEFAULT_CREDIT)
+  , m_noEndTimeout(NOEND_TIMEOUT)
+{
+}
+
+void
+WriteHandle::deleteProcess(ProcessId processId)
+{
+  m_processes.erase(processId);
+}
+
+// Interest.
+void
+WriteHandle::onInterest(const Name& prefix, const Interest& interest)
+{
+  m_validator.validate(interest,
+                          bind(&WriteHandle::onValidated, this, _1, prefix),
+                          bind(&WriteHandle::onValidationFailed, this, _1));
+}
+
+// onRegisterFailed.
+void
+WriteHandle::onRegisterFailed(const Name& prefix, const std::string& reason)
+{
+  throw Error("Insert prefix registration failed");
+}
+
+// onRegisterFailed for insert.
+void
+WriteHandle::onCheckRegisterFailed(const Name& prefix, const std::string& reason)
+{
+  throw Error("Insert check prefix registration failed");
+}
+
+void
+WriteHandle::onValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+  //m_validResult = 1;
+  RepoCommandParameter parameter;
+  try {
+    extractParameter(*interest, prefix, parameter);
+  }
+  catch (RepoCommandParameter::Error) {
+    negativeReply(*interest, 403);
+    return;
+  }
+
+  if (parameter.hasStartBlockId() || parameter.hasEndBlockId()) {
+    if (parameter.hasSelectors()) {
+      negativeReply(*interest, 402);
+      return;
+    }
+    processSegmentedInsertCommand(*interest, parameter);
+  }
+  else {
+    processSingleInsertCommand(*interest, parameter);
+  }
+
+}
+
+void
+WriteHandle::onValidationFailed(const shared_ptr<const Interest>& interest)
+{
+  std::cout << "invalidated" << std::endl;
+  negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::onData(const Interest& interest, ndn::Data& data, ProcessId processId)
+{
+  //std::cout << "onData" << std::endl;
+  //std::cout << "I: " << interest.toUri() << std::endl;
+  //std::cout << "D: " << data.getName().toUri() << std::endl;
+  if (m_processes.count(processId) == 0) {
+    return;
+  }
+
+  ProcessInfo& process = m_processes[processId];
+  RepoCommandResponse& response = process.response;
+
+  if (response.getInsertNum() == 0) {
+    getStorageHandle().insertData(data);
+    response.setInsertNum(1);
+  }
+
+  deferredDeleteProcess(processId);
+}
+
+void
+WriteHandle::onSegmentData(const Interest& interest, Data& data, ProcessId processId)
+{
+  //std::cout << "I: " << interest.toUri() << std::endl;
+  //std::cout << "D: " << data.getName().toUri() << std::endl;
+  //retrieve the process from the responsemap
+
+  if (m_processes.count(processId) == 0) {
+    return;
+  }
+  RepoCommandResponse& response = m_processes[processId].response;
+
+  //refresh endBlockId
+  Name::Component finalBlockId = data.getFinalBlockId();
+
+  if (!finalBlockId.empty()) {
+    SegmentNo final = finalBlockId.toSegment();
+    if (response.hasEndBlockId()) {
+      if (final < response.getEndBlockId()) {
+        response.setEndBlockId(final);
+      }
+    }
+    else {
+      response.setEndBlockId(final);
+    }
+  }
+
+  //insert data
+  //std::cout << "start to insert" << std::endl;
+  if (getStorageHandle().insertData(data)) {
+    response.setInsertNum(response.getInsertNum() + 1);
+  }
+  //std::cout << "end of insert" << std::endl;
+
+  //it->second = response;
+
+  onSegmentDataControl(processId, interest);
+}
+
+void
+WriteHandle::onTimeout(const ndn::Interest& interest, ProcessId processId)
+{
+  std::cout << "Timeout" << std::endl;
+  m_processes.erase(processId);
+}
+
+void
+WriteHandle::onSegmentTimeout(const Interest& interest, ProcessId processId)
+{
+  std::cout << "SegTimeout" << std::endl;
+
+  onSegmentTimeoutControl(processId, interest);
+}
+
+void
+WriteHandle::listen(const Name& prefix)
+{
+  Name insertPrefix;
+  insertPrefix.append(prefix).append("insert");
+  getFace().setInterestFilter(insertPrefix,
+                              bind(&WriteHandle::onInterest, this, _1, _2),
+                              bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+  Name insertCheckPrefix;
+  insertCheckPrefix.append(prefix).append("insert check");
+  getFace().setInterestFilter(insertCheckPrefix,
+                              bind(&WriteHandle::onCheckInterest, this, _1, _2),
+                              bind(&WriteHandle::onRegisterFailed, this, _1, _2));
+}
+
+void
+WriteHandle::segInit(ProcessId processId, const RepoCommandParameter& parameter)
+{
+  ProcessInfo& process = m_processes[processId];
+  process.credit = 0;
+
+  map<SegmentNo, int>& processRetry = process.retryCounts;
+
+  Name name = parameter.getName();
+  SegmentNo startBlockId = parameter.getStartBlockId();
+
+  uint64_t initialCredit = m_credit;
+
+  if (parameter.hasEndBlockId()) {
+    initialCredit =
+      std::min(initialCredit, parameter.getEndBlockId() - parameter.getStartBlockId());
+  }
+  else {
+    // set noEndTimeout timer
+    process.noEndTime = ndn::time::steady_clock::now() +
+                        m_noEndTimeout;
+  }
+  process.credit = initialCredit;
+  SegmentNo segment = startBlockId;
+  for (; segment < startBlockId + initialCredit; ++segment) {
+    Name fetchName = name;
+    fetchName.appendSegment(segment);
+    Interest interest(fetchName);
+    //std::cout << "seg:" << j<<std::endl;
+    getFace().expressInterest(interest,
+                              bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+                              bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+    process.credit--;
+    processRetry[segment] = 0;
+  }
+
+  queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+
+  segment++;
+  process.nextSegment = segment;
+  nextSegmentQueue.push(segment);
+}
+
+void
+WriteHandle::onSegmentDataControl(ProcessId processId, const Interest& interest)
+{
+  //std::cout << "onSegmentDataControl: " << processId << std::endl;
+
+  if (m_processes.count(processId) == 0) {
+    return;
+  }
+  ProcessInfo& process = m_processes[processId];
+  RepoCommandResponse& response = process.response;
+  int& processCredit = process.credit;
+  //onSegmentDataControl is called when a data returns.
+  //When data returns, processCredit++
+  processCredit++;
+  SegmentNo& nextSegment = process.nextSegment;
+  queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+  map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+  //read whether notime timeout
+  if (!response.hasEndBlockId()) {
+
+    ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+    ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+
+    if (now > noEndTime) {
+      std::cout << "noEndtimeout: " << processId << std::endl;
+      //m_processes.erase(processId);
+      //StatusCode should be refreshed as 405
+      response.setStatusCode(405);
+      //schedule a delete event
+      deferredDeleteProcess(processId);
+      return;
+    }
+  }
+
+  //read whether this process has total ends, if ends, remove control info from the maps
+  if (response.hasEndBlockId()) {
+    uint64_t nSegments =
+      response.getEndBlockId() - response.getStartBlockId() + 1;
+    if (response.getInsertNum() >= nSegments) {
+      //m_processes.erase(processId);
+      //All the data has been inserted, StatusCode is refreshed as 200
+      response.setStatusCode(200);
+      deferredDeleteProcess(processId);
+      return;
+    }
+  }
+
+  //check whether there is any credit
+  if (processCredit == 0)
+    return;
+
+
+  //check whether sent queue empty
+  if (nextSegmentQueue.empty()) {
+    //do not do anything
+    return;
+  }
+
+  //pop the queue
+  SegmentNo sendingSegment = nextSegmentQueue.front();
+  nextSegmentQueue.pop();
+
+  //check whether sendingSegment exceeds
+  if (sendingSegment > response.getEndBlockId()) {
+    //do not do anything
+    return;
+  }
+
+  //read whether this is retransmitted data;
+  SegmentNo fetchedSegment =
+    interest.getName().get(interest.getName().size() - 1).toSegment();
+
+  BOOST_ASSERT(retryCounts.count(fetchedSegment) != 0);
+
+  //find this fetched data, remove it from this map
+  //rit->second.erase(oit);
+  retryCounts.erase(fetchedSegment);
+  //express the interest of the top of the queue
+  Name fetchName(interest.getName().getPrefix(-1));
+  fetchName.appendSegment(sendingSegment);
+  Interest fetchInterest(fetchName);
+  getFace().expressInterest(fetchInterest,
+                            bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+                            bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+  //When an interest is expressed, processCredit--
+  processCredit--;
+  //std::cout << "sent seg: " << sendingSegment << std::endl;
+  if (retryCounts.count(sendingSegment) == 0) {
+    //not found
+    retryCounts[sendingSegment] = 0;
+  }
+  else {
+    //found
+    retryCounts[sendingSegment] = retryCounts[sendingSegment] + 1;
+  }
+  //increase the next seg and put it into the queue
+  if ((nextSegment + 1) <= response.getEndBlockId()) {
+    nextSegment++;
+    nextSegmentQueue.push(nextSegment);
+  }
+}
+
+void
+WriteHandle::onSegmentTimeoutControl(ProcessId processId, const Interest& interest)
+{
+  if (m_processes.count(processId) == 0) {
+    return;
+  }
+  ProcessInfo& process = m_processes[processId];
+  RepoCommandResponse& response = process.response;
+  SegmentNo& nextSegment = process.nextSegment;
+  queue<SegmentNo>& nextSegmentQueue = process.nextSegmentQueue;
+  map<SegmentNo, int>& retryCounts = process.retryCounts;
+
+  SegmentNo timeoutSegment = interest.getName().get(-1).toSegment();
+
+  std::cout << "timeoutSegment: " << timeoutSegment << std::endl;
+
+  BOOST_ASSERT(retryCounts.count(timeoutSegment) != 0);
+
+  //read the retry time. If retry out of time, fail the process. if not, plus
+  int& retryTime = retryCounts[timeoutSegment];
+  if (retryTime >= m_retryTime) {
+    //fail this process
+    std::cout << "Retry timeout: " << processId << std::endl;
+    m_processes.erase(processId);
+    return;
+  }
+  else {
+    //Reput it in the queue, retryTime++
+    retryTime++;
+    Interest retryInterest(interest.getName());
+    getFace().expressInterest(retryInterest,
+                              bind(&WriteHandle::onSegmentData, this, _1, _2, processId),
+                              bind(&WriteHandle::onSegmentTimeout, this, _1, processId));
+  }
+
+}
+
+void
+WriteHandle::onCheckInterest(const Name& prefix, const Interest& interest)
+{
+  m_validator.validate(interest,
+                          bind(&WriteHandle::onCheckValidated, this, _1, prefix),
+                          bind(&WriteHandle::onCheckValidationFailed, this, _1));
+
+}
+
+void
+WriteHandle::onCheckValidated(const shared_ptr<const Interest>& interest, const Name& prefix)
+{
+  RepoCommandParameter parameter;
+  try {
+    extractParameter(*interest, prefix, parameter);
+  }
+  catch (RepoCommandParameter::Error) {
+    negativeReply(*interest, 403);
+    return;
+  }
+
+  if (!parameter.hasProcessId()) {
+    negativeReply(*interest, 403);
+    return;
+  }
+  //check whether this process exists
+  ProcessId processId = parameter.getProcessId();
+  if (m_processes.count(processId) == 0) {
+    std::cout << "no such processId: " << processId << std::endl;
+    negativeReply(*interest, 404);
+    return;
+  }
+
+  ProcessInfo& process = m_processes[processId];
+
+  RepoCommandResponse& response = process.response;
+
+  //Check whether it is single data fetching
+  if (!response.hasStartBlockId() &&
+      !response.hasEndBlockId()) {
+    reply(*interest, response);
+    return;
+  }
+
+  //read if noEndtimeout
+  if (!response.hasEndBlockId()) {
+    extendNoEndTime(process);
+    reply(*interest, response);
+    return;
+  }
+  else {
+    reply(*interest, response);
+  }
+}
+
+void
+WriteHandle::onCheckValidationFailed(const shared_ptr<const Interest>& interest)
+{
+  negativeReply(*interest, 401);
+}
+
+void
+WriteHandle::deferredDeleteProcess(ProcessId processId)
+{
+  getScheduler().scheduleEvent(PROCESS_DELETE_TIME,
+                               ndn::bind(&WriteHandle::deleteProcess, this, processId));
+}
+
+void
+WriteHandle::processSingleInsertCommand(const Interest& interest,
+                                        RepoCommandParameter& parameter)
+{
+  ProcessId processId = generateProcessId();
+
+  ProcessInfo& process = m_processes[processId];
+
+  RepoCommandResponse& response = process.response;
+  response.setStatusCode(100);
+  response.setProcessId(processId);
+  response.setInsertNum(0);
+
+  reply(interest, response);
+
+  response.setStatusCode(300);
+
+  Interest fetchInterest(parameter.getName());
+  if (parameter.hasSelectors()) {
+    fetchInterest.setSelectors(parameter.getSelectors());
+  }
+  getFace().expressInterest(fetchInterest,
+                            bind(&WriteHandle::onData, this, _1, _2, processId),
+                            bind(&WriteHandle::onTimeout, this, _1, processId));
+}
+
+void
+WriteHandle::processSegmentedInsertCommand(const Interest& interest,
+                                           RepoCommandParameter& parameter)
+{
+  if (parameter.hasEndBlockId()) {
+    //normal fetch segment
+    if (!parameter.hasStartBlockId()) {
+      parameter.setStartBlockId(0);
+    }
+
+    SegmentNo startBlockId = parameter.getStartBlockId();
+    SegmentNo endBlockId = parameter.getEndBlockId();
+    //std::cout << "startBlockId: " << startBlockId << std::endl;
+    //std::cout << "endBlockId: " << endBlockId << std::endl;
+    if (startBlockId > endBlockId) {
+      negativeReply(interest, 403);
+      return;
+    }
+
+    ProcessId processId = generateProcessId();
+    ProcessInfo& process = m_processes[processId];
+    //std::cout << "processId: " << processId << std::endl;
+    RepoCommandResponse& response = process.response;
+    response.setStatusCode(100);
+    response.setProcessId(processId);
+    response.setInsertNum(0);
+    response.setStartBlockId(startBlockId);
+    response.setEndBlockId(endBlockId);
+
+    reply(interest, response);
+
+    //300 means data fetching is in progress
+    response.setStatusCode(300);
+
+    segInit(processId, parameter);
+  }
+  else {
+    //no EndBlockId, so fetch FinalBlockId in data, if timeout, stop
+    ProcessId processId = generateProcessId();
+    ProcessInfo& process = m_processes[processId];
+    //std::cout << "processId: " << processId << std::endl;
+    RepoCommandResponse& response = process.response;
+    response.setStatusCode(100);
+    response.setProcessId(processId);
+    response.setInsertNum(0);
+    response.setStartBlockId(parameter.getStartBlockId());
+    reply(interest, response);
+
+    //300 means data fetching is in progress
+    response.setStatusCode(300);
+
+    segInit(processId, parameter);
+  }
+}
+
+void
+WriteHandle::extendNoEndTime(ProcessInfo& process)
+{
+  ndn::time::steady_clock::TimePoint& noEndTime = process.noEndTime;
+  ndn::time::steady_clock::TimePoint now = ndn::time::steady_clock::now();
+  RepoCommandResponse& response = process.response;
+  if (now > noEndTime) {
+    response.setStatusCode(405);
+    return;
+  }
+  //extends noEndTime
+  process.noEndTime =
+    ndn::time::steady_clock::now() + m_noEndTimeout;
+
+}
+
+void
+WriteHandle::negativeReply(const Interest& interest, int statusCode)
+{
+  RepoCommandResponse response;
+  response.setStatusCode(statusCode);
+  reply(interest, response);
+}
+
+} //namespace repo