fw: replace straggler timer with per-strategy decision
remove onInterestReject pipeline
refs: #4200
Change-Id: I184d7ef7f732efda3d1066eef6cd89e36a6a8802
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 8c0bb55..cb7ac9d 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -127,9 +127,6 @@
return;
}
- // cancel unsatisfy & straggler timer
- this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
// is pending?
if (!pitEntry->hasInRecords()) {
m_cs.find(interest,
@@ -163,6 +160,12 @@
inFace.sendNack(nack);
}
+static inline bool
+compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
+{
+ return a.getExpiry() < b.getExpiry();
+}
+
void
Forwarder::onContentStoreMiss(const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
const Interest& interest)
@@ -173,8 +176,10 @@
// insert in-record
pitEntry->insertOrUpdateInRecord(const_cast<Face&>(inFace), interest);
- // set PIT unsatisfy timer
- this->setUnsatisfyTimer(pitEntry);
+ // set PIT expiry timer to the time that the last PIT in-record expires
+ auto lastExpiring = std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+ auto lastExpiryFromNow = lastExpiring->getExpiry() - time::steady_clock::now();
+ this->setExpiryTimer(pitEntry, time::duration_cast<time::milliseconds>(lastExpiryFromNow));
// has NextHopFaceId?
shared_ptr<lp::NextHopFaceIdTag> nextHopTag = interest.getTag<lp::NextHopFaceIdTag>();
@@ -208,8 +213,8 @@
pitEntry->isSatisfied = true;
pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
- // set PIT straggler timer
- this->setStragglerTimer(pitEntry);
+ // finalize Interest
+ this->onInterestFinalize(pitEntry);
// goto outgoing Data pipeline
this->onOutgoingData(data, *const_pointer_cast<Face>(inFace.shared_from_this()));
@@ -230,23 +235,6 @@
}
void
-Forwarder::onInterestReject(const shared_ptr<pit::Entry>& pitEntry)
-{
- if (fw::hasPendingOutRecords(*pitEntry)) {
- NFD_LOG_ERROR("onInterestReject interest=" << pitEntry->getName() <<
- " cannot reject forwarded Interest");
- return;
- }
- NFD_LOG_DEBUG("onInterestReject interest=" << pitEntry->getName());
-
- // cancel unsatisfy & straggler timer
- this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
- // set PIT straggler timer
- this->setStragglerTimer(pitEntry);
-}
-
-void
Forwarder::onInterestFinalize(const shared_ptr<pit::Entry>& pitEntry)
{
NFD_LOG_DEBUG("onInterestFinalize interest=" << pitEntry->getName() <<
@@ -256,7 +244,7 @@
this->insertDeadNonceList(*pitEntry, 0);
// PIT delete
- this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
+ scheduler::cancel(pitEntry->expiryTimer);
m_pit.erase(pitEntry.get());
}
@@ -295,9 +283,6 @@
for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
- // cancel unsatisfy & straggler timer
- this->cancelUnsatisfyAndStragglerTimer(*pitEntry);
-
// remember pending downstreams
for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
if (inRecord.getExpiry() > now) {
@@ -305,6 +290,9 @@
}
}
+ // set PIT expiry timer to now
+ this->setExpiryTimer(pitEntry, 0_ms);
+
// invoke PIT satisfy callback
this->dispatchToStrategy(*pitEntry,
[&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
@@ -318,9 +306,6 @@
// mark PIT satisfied
pitEntry->clearInRecords();
pitEntry->deleteOutRecord(inFace);
-
- // set PIT straggler timer
- this->setStragglerTimer(pitEntry);
}
// foreach pending downstream
@@ -426,6 +411,11 @@
// record Nack on out-record
outRecord->setIncomingNack(nack);
+ // set PIT expiry timer to now when all out-record receive Nack
+ if (!fw::hasPendingOutRecords(*pitEntry)) {
+ this->setExpiryTimer(pitEntry, 0_ms);
+ }
+
// trigger strategy: after receive NACK
this->dispatchToStrategy(*pitEntry,
[&] (fw::Strategy& strategy) { strategy.afterReceiveNack(inFace, nack, pitEntry); });
@@ -483,46 +473,17 @@
m_strategyChoice.findEffectiveStrategy(interest.getName()).onDroppedInterest(outFace, interest);
}
-static inline bool
-compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
-{
- return a.getExpiry() < b.getExpiry();
-}
-
void
-Forwarder::setUnsatisfyTimer(const shared_ptr<pit::Entry>& pitEntry)
+Forwarder::setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration)
{
- pit::InRecordCollection::iterator lastExpiring =
- std::max_element(pitEntry->in_begin(), pitEntry->in_end(), &compare_InRecord_expiry);
+ BOOST_ASSERT(duration >= 0_ms);
- time::steady_clock::TimePoint lastExpiry = lastExpiring->getExpiry();
- time::nanoseconds lastExpiryFromNow = lastExpiry - time::steady_clock::now();
- if (lastExpiryFromNow <= time::seconds::zero()) {
- // TODO all in-records are already expired; will this happen?
- }
+ scheduler::cancel(pitEntry->expiryTimer);
- scheduler::cancel(pitEntry->m_unsatisfyTimer);
- pitEntry->m_unsatisfyTimer = scheduler::schedule(lastExpiryFromNow,
+ pitEntry->expiryTimer = scheduler::schedule(duration,
bind(&Forwarder::onInterestFinalize, this, pitEntry));
}
-void
-Forwarder::setStragglerTimer(const shared_ptr<pit::Entry>& pitEntry)
-{
- time::nanoseconds stragglerTime = time::milliseconds(100);
-
- scheduler::cancel(pitEntry->m_stragglerTimer);
- pitEntry->m_stragglerTimer = scheduler::schedule(stragglerTime,
- bind(&Forwarder::onInterestFinalize, this, pitEntry));
-}
-
-void
-Forwarder::cancelUnsatisfyAndStragglerTimer(pit::Entry& pitEntry)
-{
- scheduler::cancel(pitEntry.m_unsatisfyTimer);
- scheduler::cancel(pitEntry.m_stragglerTimer);
-}
-
static inline void
insertNonceToDnl(DeadNonceList& dnl, const pit::Entry& pitEntry,
const pit::OutRecord& outRecord)
diff --git a/daemon/fw/forwarder.hpp b/daemon/fw/forwarder.hpp
index c50c573..36af43a 100644
--- a/daemon/fw/forwarder.hpp
+++ b/daemon/fw/forwarder.hpp
@@ -209,11 +209,6 @@
VIRTUAL_WITH_TESTS void
onOutgoingInterest(const shared_ptr<pit::Entry>& pitEntry, Face& outFace, const Interest& interest);
- /** \brief Interest reject pipeline
- */
- VIRTUAL_WITH_TESTS void
- onInterestReject(const shared_ptr<pit::Entry>& pitEntry);
-
/** \brief Interest finalize pipeline
*/
VIRTUAL_WITH_TESTS void
@@ -248,14 +243,10 @@
onDroppedInterest(Face& outFace, const Interest& interest);
PROTECTED_WITH_TESTS_ELSE_PRIVATE:
- VIRTUAL_WITH_TESTS void
- setUnsatisfyTimer(const shared_ptr<pit::Entry>& pitEntry);
-
- VIRTUAL_WITH_TESTS void
- setStragglerTimer(const shared_ptr<pit::Entry>& pitEntry);
-
- VIRTUAL_WITH_TESTS void
- cancelUnsatisfyAndStragglerTimer(pit::Entry& pitEntry);
+ /** \brief set a new expiry timer (now + \p duration) on a PIT entry
+ */
+ void
+ setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration);
/** \brief insert Nonce to Dead Nonce List if necessary
* \param upstream if null, insert Nonces from all out-records;
diff --git a/daemon/fw/strategy.hpp b/daemon/fw/strategy.hpp
index 71fa506..12a06e8 100644
--- a/daemon/fw/strategy.hpp
+++ b/daemon/fw/strategy.hpp
@@ -121,11 +121,18 @@
* - cannot be satisfied by ContentStore
* - is under a namespace managed by this strategy
*
+ * The PIT entry is set to expire after InterestLifetime has elapsed at each downstream.
+ *
* The strategy should decide whether and where to forward this Interest.
* - If the strategy decides to forward this Interest,
- * invoke this->sendInterest one or more times, either now or shortly after
- * - If strategy concludes that this Interest cannot be forwarded,
- * invoke this->rejectPendingInterest so that PIT entry will be deleted shortly
+ * invoke \c sendInterest for each upstream, either now or shortly after via a scheduler event,
+ * but before PIT entry expires.
+ * Optionally, the strategy can invoke \c setExpiryTimer to adjust how long it would wait for a response.
+ * - If the strategy has already forwarded this Interest previously and decides to continue waiting,
+ * do nothing.
+ * Optionally, the strategy can invoke \c setExpiryTimer to adjust how long it would wait for a response.
+ * - If the strategy concludes that this Interest cannot be satisfied,
+ * invoke \c rejectPendingInterest to erase the PIT entry.
*
* \warning The strategy must not retain shared_ptr<pit::Entry>, otherwise undefined behavior
* may occur. However, the strategy is allowed to store weak_ptr<pit::Entry>.
@@ -137,7 +144,13 @@
/** \brief trigger before PIT entry is satisfied
*
* This trigger is invoked when an incoming Data satisfies the PIT entry.
- * It can be invoked even if the PIT entry has already been satisfied.
+ * Normally, only the first incoming Data would satisfy the PIT entry and invoke this trigger,
+ * after which the PIT entry is erased.
+ *
+ * If the strategy wishes to collect responses from additional upstream nodes,
+ * it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+ * If a Data arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
+ * At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
*
* In this base class this method does nothing.
*
@@ -156,6 +169,14 @@
* to that upstream, i.e. the PIT out-record exists and has a matching Nonce.
* The NackHeader has been recorded in the PIT out-record.
*
+ * If the PIT entry is not yet satisfied, its expiry timer remains unchanged.
+ * Otherwise, the PIT entry normally would expire immediately after this function returns.
+ *
+ * If the strategy wishes to collect responses from additional upstream nodes,
+ * it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+ * If a Nack arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
+ * At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
+ *
* In this base class this method does nothing.
*
* \warning The strategy must not retain shared_ptr<pit::Entry>, otherwise undefined behavior
@@ -185,16 +206,16 @@
m_forwarder.onOutgoingInterest(pitEntry, outFace, interest);
}
- /** \brief decide that a pending Interest cannot be forwarded
- * \param pitEntry PIT entry
+ /** \brief schedule the PIT entry for immediate deletion
*
- * This shall not be called if the pending Interest has been
- * forwarded earlier, and does not need to be resent now.
+ * This helper function sets the PIT entry expiry time to zero.
+ * The strategy should invoke this function when it concludes that the Interest cannot
+ * be forwarded and it does not want to wait for responses from existing upstream nodes.
*/
VIRTUAL_WITH_TESTS void
rejectPendingInterest(const shared_ptr<pit::Entry>& pitEntry)
{
- m_forwarder.onInterestReject(pitEntry);
+ this->setExpiryTimer(pitEntry, 0_ms);
}
/** \brief send Nack to outFace
@@ -222,6 +243,14 @@
sendNacks(const shared_ptr<pit::Entry>& pitEntry, const lp::NackHeader& header,
std::initializer_list<const Face*> exceptFaces = std::initializer_list<const Face*>());
+ /** \brief Schedule the PIT entry to be erased after \p duration
+ */
+ void
+ setExpiryTimer(const shared_ptr<pit::Entry>& pitEntry, time::milliseconds duration)
+ {
+ m_forwarder.setExpiryTimer(pitEntry, duration);
+ }
+
protected: // accessors
/** \brief performs a FIB lookup, considering Link object if present
*/