fw: replace straggler timer with per-strategy decision

remove onInterestReject pipeline

refs: #4200

Change-Id: I184d7ef7f732efda3d1066eef6cd89e36a6a8802
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 8c0bb55..cb7ac9d 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -127,9 +127,6 @@
     return;
   }
 
-  // cancel unsatisfy & straggler timer
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
   // is pending?
   if (!pitEntry->hasInRecords()) {
     m_cs.find(interest,
@@ -163,6 +160,12 @@
   inFace.sendNack(nack);
 }
 
+static inline bool
+compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
+{
+  return a.getExpiry() < b.getExpiry();
+}
+
 void
 Forwarder::onContentStoreMiss(const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
                               const Interest& interest)
@@ -173,8 +176,10 @@
   // insert in-record
   pitEntry->insertOrUpdateInRecord(const_cast<Face&>(inFace), interest);
 
-  // set PIT unsatisfy timer
-  this->setUnsatisfyTimer(pitEntry);
+  // set PIT expiry timer to the time that the last PIT in-record expires
+  auto lastExpiring = std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+  auto lastExpiryFromNow = lastExpiring->getExpiry() - time::steady_clock::now();
+  this->setExpiryTimer(pitEntry, time::duration_cast<time::milliseconds>(lastExpiryFromNow));
 
   // has NextHopFaceId?
   shared_ptr<lp::NextHopFaceIdTag> nextHopTag = interest.getTag<lp::NextHopFaceIdTag>();
@@ -208,8 +213,8 @@
   pitEntry->isSatisfied = true;
   pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
 
-  // set PIT straggler timer
-  this->setStragglerTimer(pitEntry);
+  // finalize Interest
+  this->onInterestFinalize(pitEntry);
 
   // goto outgoing Data pipeline
   this->onOutgoingData(data, *const_pointer_cast<Face>(inFace.shared_from_this()));
@@ -230,23 +235,6 @@
 }
 
 void
-Forwarder::onInterestReject(const shared_ptr<pit::Entry>& pitEntry)
-{
-  if (fw::hasPendingOutRecords(*pitEntry)) {
-    NFD_LOG_ERROR("onInterestReject interest=" << pitEntry->getName() <<
-                  " cannot reject forwarded Interest");
-    return;
-  }
-  NFD_LOG_DEBUG("onInterestReject interest=" << pitEntry->getName());
-
-  // cancel unsatisfy & straggler timer
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
-  // set PIT straggler timer
-  this->setStragglerTimer(pitEntry);
-}
-
-void
 Forwarder::onInterestFinalize(const shared_ptr<pit::Entry>& pitEntry)
 {
   NFD_LOG_DEBUG("onInterestFinalize interest=" << pitEntry->getName() <<
@@ -256,7 +244,7 @@
   this->insertDeadNonceList(*pitEntry, 0);
 
   // PIT delete
-  this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
+  scheduler::cancel(pitEntry->expiryTimer);
   m_pit.erase(pitEntry.get());
 }
 
@@ -295,9 +283,6 @@
   for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
     NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
 
-    // cancel unsatisfy & straggler timer
-    this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
     // remember pending downstreams
     for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
       if (inRecord.getExpiry() > now) {
@@ -305,6 +290,9 @@
       }
     }
 
+    // set PIT expiry timer to now
+    this->setExpiryTimer(pitEntry, 0_ms);
+
     // invoke PIT satisfy callback
     this->dispatchToStrategy(*pitEntry,
       [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
@@ -318,9 +306,6 @@
     // mark PIT satisfied
     pitEntry->clearInRecords();
     pitEntry->deleteOutRecord(inFace);
-
-    // set PIT straggler timer
-    this->setStragglerTimer(pitEntry);
   }
 
   // foreach pending downstream
@@ -426,6 +411,11 @@
   // record Nack on out-record
   outRecord->setIncomingNack(nack);
 
+  // set PIT expiry timer to now when all out-record receive Nack
+  if (!fw::hasPendingOutRecords(*pitEntry)) {
+    this->setExpiryTimer(pitEntry, 0_ms);
+  }
+
   // trigger strategy: after receive NACK
   this->dispatchToStrategy(*pitEntry,
     [&] (fw::Strategy& strategy) { strategy.afterReceiveNack(inFace, nack, pitEntry); });
@@ -483,46 +473,17 @@
   m_strategyChoice.findEffectiveStrategy(interest.getName()).onDroppedInterest(outFace, interest);
 }
 
-static inline bool
-compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
-{
-  return a.getExpiry() < b.getExpiry();
-}
-
 void
-Forwarder::setUnsatisfyTimer(const shared_ptr<pit::Entry>& pitEntry)
+Forwarder::setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration)
 {
-  pit::InRecordCollection::iterator lastExpiring =
-    std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+  BOOST_ASSERT(duration >= 0_ms);
 
-  time::steady_clock::TimePoint lastExpiry = lastExpiring->getExpiry();
-  time::nanoseconds lastExpiryFromNow = lastExpiry - time::steady_clock::now();
-  if (lastExpiryFromNow <= time::seconds::zero()) {
-    // TODO all in-records are already expired; will this happen?
-  }
+  scheduler::cancel(pitEntry->expiryTimer);
 
-  scheduler::cancel(pitEntry->m_unsatisfyTimer);
-  pitEntry->m_unsatisfyTimer = scheduler::schedule(lastExpiryFromNow,
+  pitEntry->expiryTimer = scheduler::schedule(duration,
     bind(&Forwarder::onInterestFinalize, this, pitEntry));
 }
 
-void
-Forwarder::setStragglerTimer(const shared_ptr<pit::Entry>& pitEntry)
-{
-  time::nanoseconds stragglerTime = time::milliseconds(100);
-
-  scheduler::cancel(pitEntry->m_stragglerTimer);
-  pitEntry->m_stragglerTimer = scheduler::schedule(stragglerTime,
-    bind(&Forwarder::onInterestFinalize, this, pitEntry));
-}
-
-void
-Forwarder::cancelUnsatisfyAndStragglerTimer(pit::Entry& pitEntry)
-{
-  scheduler::cancel(pitEntry.m_unsatisfyTimer);
-  scheduler::cancel(pitEntry.m_stragglerTimer);
-}
-
 static inline void
 insertNonceToDnl(DeadNonceList& dnl, const pit::Entry& pitEntry,
                  const pit::OutRecord& outRecord)
diff --git a/daemon/fw/forwarder.hpp b/daemon/fw/forwarder.hpp
index c50c573..36af43a 100644
--- a/daemon/fw/forwarder.hpp
+++ b/daemon/fw/forwarder.hpp
@@ -209,11 +209,6 @@
   VIRTUAL_WITH_TESTS void
   onOutgoingInterest(const shared_ptr<pit::Entry>& pitEntry, Face& outFace, const Interest& interest);
 
-  /** \brief Interest reject pipeline
-   */
-  VIRTUAL_WITH_TESTS void
-  onInterestReject(const shared_ptr<pit::Entry>& pitEntry);
-
   /** \brief Interest finalize pipeline
    */
   VIRTUAL_WITH_TESTS void
@@ -248,14 +243,10 @@
   onDroppedInterest(Face& outFace, const Interest& interest);
 
 PROTECTED_WITH_TESTS_ELSE_PRIVATE:
-  VIRTUAL_WITH_TESTS void
-  setUnsatisfyTimer(const shared_ptr<pit::Entry>& pitEntry);
-
-  VIRTUAL_WITH_TESTS void
-  setStragglerTimer(const shared_ptr<pit::Entry>& pitEntry);
-
-  VIRTUAL_WITH_TESTS void
-  cancelUnsatisfyAndStragglerTimer(pit::Entry& pitEntry);
+  /** \brief set a new expiry timer (now + \p duration) on a PIT entry
+   */
+  void
+  setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration);
 
   /** \brief insert Nonce to Dead Nonce List if necessary
    *  \param upstream if null, insert Nonces from all out-records;
diff --git a/daemon/fw/strategy.hpp b/daemon/fw/strategy.hpp
index 71fa506..12a06e8 100644
--- a/daemon/fw/strategy.hpp
+++ b/daemon/fw/strategy.hpp
@@ -121,11 +121,18 @@
    *  - cannot be satisfied by ContentStore
    *  - is under a namespace managed by this strategy
    *
+   *  The PIT entry is set to expire after InterestLifetime has elapsed at each downstream.
+   *
    *  The strategy should decide whether and where to forward this Interest.
    *  - If the strategy decides to forward this Interest,
-   *    invoke this->sendInterest one or more times, either now or shortly after
-   *  - If strategy concludes that this Interest cannot be forwarded,
-   *    invoke this->rejectPendingInterest so that PIT entry will be deleted shortly
+   *    invoke \c sendInterest for each upstream, either now or shortly after via a scheduler event,
+   *    but before PIT entry expires.
+   *    Optionally, the strategy can invoke \c setExpiryTimer to adjust how long it would wait for a response.
+   *  - If the strategy has already forwarded this Interest previously and decides to continue waiting,
+   *    do nothing.
+   *    Optionally, the strategy can invoke \c setExpiryTimer to adjust how long it would wait for a response.
+   *  - If the strategy concludes that this Interest cannot be satisfied,
+   *    invoke \c rejectPendingInterest to erase the PIT entry.
    *
    *  \warning The strategy must not retain shared_ptr<pit::Entry>, otherwise undefined behavior
    *           may occur. However, the strategy is allowed to store weak_ptr<pit::Entry>.
@@ -137,7 +144,13 @@
   /** \brief trigger before PIT entry is satisfied
    *
    *  This trigger is invoked when an incoming Data satisfies the PIT entry.
-   *  It can be invoked even if the PIT entry has already been satisfied.
+   *  Normally, only the first incoming Data would satisfy the PIT entry and invoke this trigger,
+   *  after which the PIT entry is erased.
+   *
+   *  If the strategy wishes to collect responses from additional upstream nodes,
+   *  it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+   *  If a Data arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
+   *  At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
    *
    *  In this base class this method does nothing.
    *
@@ -156,6 +169,14 @@
    *  to that upstream, i.e. the PIT out-record exists and has a matching Nonce.
    *  The NackHeader has been recorded in the PIT out-record.
    *
+   *  If the PIT entry is not yet satisfied, its expiry timer remains unchanged.
+   *  Otherwise, the PIT entry normally would expire immediately after this function returns.
+   *
+   *  If the strategy wishes to collect responses from additional upstream nodes,
+   *  it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+   *  If a Nack arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
+   *  At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
+   *
    *  In this base class this method does nothing.
    *
    *  \warning The strategy must not retain shared_ptr<pit::Entry>, otherwise undefined behavior
@@ -185,16 +206,16 @@
     m_forwarder.onOutgoingInterest(pitEntry, outFace, interest);
   }
 
-  /** \brief decide that a pending Interest cannot be forwarded
-   *  \param pitEntry PIT entry
+  /** \brief schedule the PIT entry for immediate deletion
    *
-   *  This shall not be called if the pending Interest has been
-   *  forwarded earlier, and does not need to be resent now.
+   *  This helper function sets the PIT entry expiry time to zero.
+   *  The strategy should invoke this function when it concludes that the Interest cannot
+   *  be forwarded and it does not want to wait for responses from existing upstream nodes.
    */
   VIRTUAL_WITH_TESTS void
   rejectPendingInterest(const shared_ptr<pit::Entry>& pitEntry)
   {
-    m_forwarder.onInterestReject(pitEntry);
+    this->setExpiryTimer(pitEntry, 0_ms);
   }
 
   /** \brief send Nack to outFace
@@ -222,6 +243,14 @@
   sendNacks(const shared_ptr<pit::Entry>& pitEntry, const lp::NackHeader& header,
             std::initializer_list<const Face*> exceptFaces = std::initializer_list<const Face*>());
 
+  /** \brief Schedule the PIT entry to be erased after \p duration
+   */
+  void
+  setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration)
+  {
+    m_forwarder.setExpiryTimer(pitEntry, duration);
+  }
+
 protected: // accessors
   /** \brief performs a FIB lookup, considering Link object if present
    */