fw: replace straggler timer with per-strategy decision

remove onInterestReject pipeline

refs: #4200

Change-Id: I184d7ef7f732efda3d1066eef6cd89e36a6a8802
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 8c0bb55..cb7ac9d 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -127,9 +127,6 @@
     return;
   }
 
-  // cancel unsatisfy & straggler timer
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
   // is pending?
   if (!pitEntry->hasInRecords()) {
     m_cs.find(interest,
@@ -163,6 +160,12 @@
   inFace.sendNack(nack);
 }
 
+static inline bool
+compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
+{
+  return a.getExpiry() < b.getExpiry();
+}
+
 void
 Forwarder::onContentStoreMiss(const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
                               const Interest& interest)
@@ -173,8 +176,10 @@
   // insert in-record
   pitEntry->insertOrUpdateInRecord(const_cast<Face&>(inFace), interest);
 
-  // set PIT unsatisfy timer
-  this->setUnsatisfyTimer(pitEntry);
+  // set PIT expiry timer to the time that the last PIT in-record expires
+  auto lastExpiring = std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+  auto lastExpiryFromNow = lastExpiring->getExpiry() - time::steady_clock::now();
+  this->setExpiryTimer(pitEntry, time::duration_cast<time::milliseconds>(lastExpiryFromNow));
 
   // has NextHopFaceId?
   shared_ptr<lp::NextHopFaceIdTag> nextHopTag = interest.getTag<lp::NextHopFaceIdTag>();
@@ -208,8 +213,8 @@
   pitEntry->isSatisfied = true;
   pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
 
-  // set PIT straggler timer
-  this->setStragglerTimer(pitEntry);
+  // finalize Interest
+  this->onInterestFinalize(pitEntry);
 
   // goto outgoing Data pipeline
   this->onOutgoingData(data, *const_pointer_cast<Face>(inFace.shared_from_this()));
@@ -230,23 +235,6 @@
 }
 
 void
-Forwarder::onInterestReject(const shared_ptr<pit::Entry>& pitEntry)
-{
-  if (fw::hasPendingOutRecords(*pitEntry)) {
-    NFD_LOG_ERROR("onInterestReject interest=" << pitEntry->getName() <<
-                  " cannot reject forwarded Interest");
-    return;
-  }
-  NFD_LOG_DEBUG("onInterestReject interest=" << pitEntry->getName());
-
-  // cancel unsatisfy & straggler timer
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
-  // set PIT straggler timer
-  this->setStragglerTimer(pitEntry);
-}
-
-void
 Forwarder::onInterestFinalize(const shared_ptr<pit::Entry>& pitEntry)
 {
   NFD_LOG_DEBUG("onInterestFinalize interest=" << pitEntry->getName() <<
@@ -256,7 +244,7 @@
   this->insertDeadNonceList(*pitEntry, 0);
 
   // PIT delete
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
+  scheduler::cancel(pitEntry->expiryTimer);
   m_pit.erase(pitEntry.get());
 }
 
@@ -295,9 +283,6 @@
   for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
     NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
 
-    // cancel unsatisfy & straggler timer
-    this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
     // remember pending downstreams
     for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
       if (inRecord.getExpiry() > now) {
@@ -305,6 +290,9 @@
       }
     }
 
+    // set PIT expiry timer to now
+    this->setExpiryTimer(pitEntry, 0_ms);
+
     // invoke PIT satisfy callback
     this->dispatchToStrategy(*pitEntry,
       [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
@@ -318,9 +306,6 @@
     // mark PIT satisfied
     pitEntry->clearInRecords();
     pitEntry->deleteOutRecord(inFace);
-
-    // set PIT straggler timer
-    this->setStragglerTimer(pitEntry);
   }
 
   // foreach pending downstream
@@ -426,6 +411,11 @@
   // record Nack on out-record
   outRecord->setIncomingNack(nack);
 
+  // set PIT expiry timer to now when all out-record receive Nack
+  if (!fw::hasPendingOutRecords(*pitEntry)) {
+    this->setExpiryTimer(pitEntry, 0_ms);
+  }
+
   // trigger strategy: after receive NACK
   this->dispatchToStrategy(*pitEntry,
     [&] (fw::Strategy& strategy) { strategy.afterReceiveNack(inFace, nack, pitEntry); });
@@ -483,46 +473,17 @@
   m_strategyChoice.findEffectiveStrategy(interest.getName()).onDroppedInterest(outFace, interest);
 }
 
-static inline bool
-compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
-{
-  return a.getExpiry() < b.getExpiry();
-}
-
 void
-Forwarder::setUnsatisfyTimer(const shared_ptr<pit::Entry>& pitEntry)
+Forwarder::setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration)
 {
-  pit::InRecordCollection::iterator lastExpiring =
-    std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+  BOOST_ASSERT(duration >= 0_ms);
 
-  time::steady_clock::TimePoint lastExpiry = lastExpiring->getExpiry();
-  time::nanoseconds lastExpiryFromNow = lastExpiry - time::steady_clock::now();
-  if (lastExpiryFromNow <= time::seconds::zero()) {
-    // TODO all in-records are already expired; will this happen?
-  }
+  scheduler::cancel(pitEntry->expiryTimer);
 
-  scheduler::cancel(pitEntry->m_unsatisfyTimer);
-  pitEntry->m_unsatisfyTimer = scheduler::schedule(lastExpiryFromNow,
+  pitEntry->expiryTimer = scheduler::schedule(duration,
     bind(&Forwarder::onInterestFinalize, this, pitEntry));
 }
 
-void
-Forwarder::setStragglerTimer(const shared_ptr<pit::Entry>& pitEntry)
-{
-  time::nanoseconds stragglerTime = time::milliseconds(100);
-
-  scheduler::cancel(pitEntry->m_stragglerTimer);
-  pitEntry->m_stragglerTimer = scheduler::schedule(stragglerTime,
-    bind(&Forwarder::onInterestFinalize, this, pitEntry));
-}
-
-void
-Forwarder::cancelUnsatisfyAndStragglerTimer(pit::Entry& pitEntry)
-{
-  scheduler::cancel(pitEntry.m_unsatisfyTimer);
-  scheduler::cancel(pitEntry.m_stragglerTimer);
-}
-
 static inline void
 insertNonceToDnl(DeadNonceList& dnl, const pit::Entry& pitEntry,
                  const pit::OutRecord& outRecord)