fw: properly handle consumer retransmission in AsfStrategy
Do not reject an existing PIT entry if there are no
next hops during consumer retransmission
refs: #4874
Change-Id: Iea8602897e99fdf2f6fd0226b5eb74bd7a4bced1
diff --git a/daemon/fw/algorithm.cpp b/daemon/fw/algorithm.cpp
index a0c8bc4..6ea610f 100644
--- a/daemon/fw/algorithm.cpp
+++ b/daemon/fw/algorithm.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2018, Regents of the University of California,
+ * Copyright (c) 2014-2019, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -128,5 +128,51 @@
return lastOutgoing->getLastRenewed();
}
+fib::NextHopList::const_iterator
+findEligibleNextHopWithEarliestOutRecord(const Face& inFace, const Interest& interest,
+ const fib::NextHopList& nexthops,
+ const shared_ptr<pit::Entry>& pitEntry)
+{
+ auto found = nexthops.end();
+ auto earliestRenewed = time::steady_clock::TimePoint::max();
+
+ for (auto it = nexthops.begin(); it != nexthops.end(); ++it) {
+ if (!isNextHopEligible(inFace, interest, *it, pitEntry))
+ continue;
+
+ auto outRecord = pitEntry->getOutRecord(it->getFace(), 0);
+ BOOST_ASSERT(outRecord != pitEntry->out_end());
+ if (outRecord->getLastRenewed() < earliestRenewed) {
+ found = it;
+ earliestRenewed = outRecord->getLastRenewed();
+ }
+ }
+ return found;
+}
+
+bool
+isNextHopEligible(const Face& inFace, const Interest& interest,
+ const fib::NextHop& nexthop,
+ const shared_ptr<pit::Entry>& pitEntry,
+ bool wantUnused,
+ time::steady_clock::TimePoint now)
+{
+ const Face& outFace = nexthop.getFace();
+
+ // do not forward back to the same face, unless it is ad hoc
+ if ((outFace.getId() == inFace.getId() && outFace.getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) ||
+ (wouldViolateScope(inFace, interest, outFace)))
+ return false;
+
+ if (wantUnused) {
+ // nexthop must not have unexpired out-record
+ auto outRecord = pitEntry->getOutRecord(outFace, 0);
+ if (outRecord != pitEntry->out_end() && outRecord->getExpiry() > now) {
+ return false;
+ }
+ }
+ return true;
+}
+
} // namespace fw
} // namespace nfd
diff --git a/daemon/fw/algorithm.hpp b/daemon/fw/algorithm.hpp
index 7ee0f38..95d8bc5 100644
--- a/daemon/fw/algorithm.hpp
+++ b/daemon/fw/algorithm.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2018, Regents of the University of California,
+ * Copyright (c) 2014-2019, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -27,6 +27,7 @@
#define NFD_DAEMON_FW_PIT_ALGORITHM_HPP
#include "core/scope-prefix.hpp"
+#include "table/fib.hpp"
#include "table/pit-entry.hpp"
/** \file
@@ -83,6 +84,29 @@
time::steady_clock::TimePoint
getLastOutgoing(const pit::Entry& pitEntry);
+/** \brief pick an eligible NextHop with earliest out-record
+ * \note It is assumed that every nexthop has an out-record.
+ */
+fib::NextHopList::const_iterator
+findEligibleNextHopWithEarliestOutRecord(const Face& inFace, const Interest& interest,
+ const fib::NextHopList& nexthops,
+ const shared_ptr<pit::Entry>& pitEntry);
+
+/** \brief determines whether a NextHop is eligible i.e. not the same inFace
+ * \param inFace incoming face of current Interest
+ * \param interest incoming Interest
+ * \param nexthop next hop
+ * \param pitEntry PIT entry
+ * \param wantUnused if true, NextHop must not have unexpired out-record
+ * \param now time::steady_clock::now(), ignored if !wantUnused
+ */
+bool
+isNextHopEligible(const Face& inFace, const Interest& interest,
+ const fib::NextHop& nexthop,
+ const shared_ptr<pit::Entry>& pitEntry,
+ bool wantUnused = false,
+ time::steady_clock::TimePoint now = time::steady_clock::TimePoint::min());
+
} // namespace fw
} // namespace nfd
diff --git a/daemon/fw/asf-strategy.cpp b/daemon/fw/asf-strategy.cpp
index 46acaf4..e013fab 100644
--- a/daemon/fw/asf-strategy.cpp
+++ b/daemon/fw/asf-strategy.cpp
@@ -108,17 +108,24 @@
}
void
+AsfStrategy::sendAsfProbe(const FaceEndpoint& ingress, const Interest& interest,
+ const shared_ptr<pit::Entry>& pitEntry, const Face& faceToUse,
+ const fib::Entry& fibEntry)
+{
+ Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
+ if (faceToProbe != nullptr) {
+ forwardInterest(interest, fibEntry, pitEntry, *faceToProbe, true);
+ m_probing.afterForwardingProbe(fibEntry, interest);
+ }
+}
+
+void
AsfStrategy::afterReceiveInterest(const FaceEndpoint& ingress, const Interest& interest,
const shared_ptr<pit::Entry>& pitEntry)
{
// Should the Interest be suppressed?
RetxSuppressionResult suppressResult = m_retxSuppression.decidePerPitEntry(*pitEntry);
-
- switch (suppressResult) {
- case RetxSuppressionResult::NEW:
- case RetxSuppressionResult::FORWARD:
- break;
- case RetxSuppressionResult::SUPPRESS:
+ if (suppressResult == RetxSuppressionResult::SUPPRESS) {
NFD_LOG_DEBUG(interest << " from=" << ingress << " suppressed");
return;
}
@@ -126,33 +133,52 @@
const fib::Entry& fibEntry = this->lookupFib(*pitEntry);
const fib::NextHopList& nexthops = fibEntry.getNextHops();
- if (nexthops.size() == 0) {
- sendNoRouteNack(ingress, interest, pitEntry);
- this->rejectPendingInterest(pitEntry);
- return;
- }
-
- Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face);
-
- if (faceToUse == nullptr) {
- sendNoRouteNack(ingress, interest, pitEntry);
- this->rejectPendingInterest(pitEntry);
- return;
- }
-
- NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
-
- forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
-
- // If necessary, send probe
- if (m_probing.isProbingNeeded(fibEntry, interest)) {
- Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, *faceToUse);
-
- if (faceToProbe != nullptr) {
- bool wantNewNonce = true;
- forwardInterest(interest, fibEntry, pitEntry, *faceToProbe, wantNewNonce);
- m_probing.afterForwardingProbe(fibEntry, interest);
+ if (suppressResult == RetxSuppressionResult::NEW) {
+ if (nexthops.size() == 0) {
+ // send noRouteNack if nexthop is not available
+ sendNoRouteNack(ingress, interest, pitEntry);
+ this->rejectPendingInterest(pitEntry);
+ return;
}
+
+ Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry);
+
+ if (faceToUse == nullptr) {
+ sendNoRouteNack(ingress, interest, pitEntry);
+ this->rejectPendingInterest(pitEntry);
+ return;
+ }
+
+ NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
+ forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
+
+ // If necessary, send probe
+ if (m_probing.isProbingNeeded(fibEntry, interest)) {
+ sendAsfProbe(ingress, interest, pitEntry, *faceToUse, fibEntry);
+ }
+ return;
+ }
+
+ Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry, false);
+ // if unused face not found, select nexthop with earliest out record
+ if (faceToUse != nullptr) {
+
+ NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
+ forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
+ // avoid probing in case of forwarding
+ return;
+ }
+
+ // find an eligible upstream that is used earliest
+ auto it = nexthops.end();
+ it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
+ if (it == nexthops.end()) {
+ NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmitNoNextHop");
+ }
+ else {
+ auto egress = FaceEndpoint(it->getFace(), 0);
+ NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmit-retry-to=" << egress);
+ this->sendInterest(pitEntry, egress, interest);
}
}
@@ -201,7 +227,6 @@
bool wantNewNonce)
{
auto egress = FaceEndpoint(outFace, 0);
-
if (wantNewNonce) {
//Send probe: interest with new Nonce
Interest probeInterest(interest);
@@ -262,7 +287,8 @@
Face*
AsfStrategy::getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest,
- const Face& inFace)
+ const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
+ bool isInterestNew)
{
NFD_LOG_TRACE("Looking for best face for " << fibEntry.getPrefix());
@@ -286,11 +312,11 @@
}
});
+ auto now = time::steady_clock::now();
for (const fib::NextHop& hop : fibEntry.getNextHops()) {
Face& hopFace = hop.getFace();
- if ((hopFace.getId() == inFace.getId() && hopFace.getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) ||
- wouldViolateScope(inFace, interest, hopFace)) {
+ if (!isNextHopEligible(inFace, interest, hop, pitEntry, !isInterestNew, now)) {
continue;
}
diff --git a/daemon/fw/asf-strategy.hpp b/daemon/fw/asf-strategy.hpp
index 2155d94..a181676 100644
--- a/daemon/fw/asf-strategy.hpp
+++ b/daemon/fw/asf-strategy.hpp
@@ -73,8 +73,15 @@
Face& outFace,
bool wantNewNonce = false);
+ void
+ sendAsfProbe(const FaceEndpoint& ingress, const Interest& interest,
+ const shared_ptr<pit::Entry>& pitEntry, const Face& faceToUse,
+ const fib::Entry& fibEntry);
+
Face*
- getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest, const Face& inFace);
+ getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest,
+ const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
+ bool isNewInterest = true);
void
onTimeout(const Name& interestName, const FaceId faceId);
diff --git a/daemon/fw/best-route-strategy2.cpp b/daemon/fw/best-route-strategy2.cpp
index ebab2ef..5c7d6a2 100644
--- a/daemon/fw/best-route-strategy2.cpp
+++ b/daemon/fw/best-route-strategy2.cpp
@@ -61,68 +61,6 @@
return strategyName;
}
-/** \brief determines whether a NextHop is eligible
- * \param inFace incoming face of current Interest
- * \param interest incoming Interest
- * \param nexthop next hop
- * \param pitEntry PIT entry
- * \param wantUnused if true, NextHop must not have unexpired out-record
- * \param now time::steady_clock::now(), ignored if !wantUnused
- */
-static bool
-isNextHopEligible(const Face& inFace, const Interest& interest,
- const fib::NextHop& nexthop,
- const shared_ptr<pit::Entry>& pitEntry,
- bool wantUnused = false,
- time::steady_clock::TimePoint now = time::steady_clock::TimePoint::min())
-{
- const Face& outFace = nexthop.getFace();
-
- // do not forward back to the same face, unless it is ad hoc
- if (outFace.getId() == inFace.getId() && outFace.getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC)
- return false;
-
- // forwarding would violate scope
- if (wouldViolateScope(inFace, interest, outFace))
- return false;
-
- if (wantUnused) {
- // nexthop must not have unexpired out-record
- auto outRecord = pitEntry->getOutRecord(outFace, 0);
- if (outRecord != pitEntry->out_end() && outRecord->getExpiry() > now) {
- return false;
- }
- }
-
- return true;
-}
-
-/** \brief pick an eligible NextHop with earliest out-record
- * \note It is assumed that every nexthop has an out-record.
- */
-static fib::NextHopList::const_iterator
-findEligibleNextHopWithEarliestOutRecord(const Face& inFace, const Interest& interest,
- const fib::NextHopList& nexthops,
- const shared_ptr<pit::Entry>& pitEntry)
-{
- auto found = nexthops.end();
- auto earliestRenewed = time::steady_clock::TimePoint::max();
-
- for (auto it = nexthops.begin(); it != nexthops.end(); ++it) {
- if (!isNextHopEligible(inFace, interest, *it, pitEntry))
- continue;
-
- auto outRecord = pitEntry->getOutRecord(it->getFace(), 0);
- BOOST_ASSERT(outRecord != pitEntry->out_end());
- if (outRecord->getLastRenewed() < earliestRenewed) {
- found = it;
- earliestRenewed = outRecord->getLastRenewed();
- }
- }
-
- return found;
-}
-
void
BestRouteStrategy2::afterReceiveInterest(const FaceEndpoint& ingress, const Interest& interest,
const shared_ptr<pit::Entry>& pitEntry)
@@ -155,8 +93,8 @@
}
auto egress = FaceEndpoint(it->getFace(), 0);
- this->sendInterest(pitEntry, egress, interest);
NFD_LOG_DEBUG(interest << " from=" << ingress << " newPitEntry-to=" << egress);
+ this->sendInterest(pitEntry, egress, interest);
return;
}
diff --git a/tests/daemon/fw/asf-strategy.t.cpp b/tests/daemon/fw/asf-strategy.t.cpp
index 27ed2aa..33eace4 100644
--- a/tests/daemon/fw/asf-strategy.t.cpp
+++ b/tests/daemon/fw/asf-strategy.t.cpp
@@ -46,7 +46,7 @@
{
protected:
explicit
- AsfGridFixture(const Name& params = AsfStrategy::getStrategyName())
+ AsfGridFixture(const Name& params = AsfStrategy::getStrategyName(), time::nanoseconds replyDelay = 0_ms)
: parameters(params)
{
/*
@@ -82,7 +82,8 @@
consumer = topo.addAppFace("c", nodeA);
producer = topo.addAppFace("p", nodeC, PRODUCER_PREFIX);
- topo.addEchoProducer(producer->getClientFace());
+
+ topo.addEchoProducer(producer->getClientFace(), PRODUCER_PREFIX, replyDelay);
// Register producer prefix on consumer node
topo.registerPrefix(nodeA, linkAB->getFace(nodeA), PRODUCER_PREFIX, 10);
@@ -199,6 +200,100 @@
BOOST_CHECK_GE(linkAD->getFace(nodeA).getCounters().nOutInterests, 2);
}
+class AsfStrategyDelayedDataFixture : public AsfGridFixture
+{
+protected:
+ AsfStrategyDelayedDataFixture()
+ : AsfGridFixture(Name(AsfStrategy::getStrategyName()), 400_ms)
+ {
+ }
+};
+
+BOOST_FIXTURE_TEST_CASE(InterestForwarding, AsfStrategyDelayedDataFixture)
+{
+
+ Name name(PRODUCER_PREFIX);
+ name.appendTimestamp();
+ shared_ptr<Interest> interest = makeInterest(name, true);
+
+ topo.registerPrefix(nodeB, linkBC->getFace(nodeB), PRODUCER_PREFIX);
+ topo.registerPrefix(nodeD, linkCD->getFace(nodeD), PRODUCER_PREFIX);
+
+ // The first interest should go via link AD
+ consumer->getClientFace().expressInterest(*interest, nullptr, nullptr, nullptr);
+ this->advanceClocks(10_ms, 100_ms);
+ BOOST_CHECK_EQUAL(linkAD->getFace(nodeA).getCounters().nOutInterests, 1);
+
+ // Second interest should go via link AB
+ interest->refreshNonce();
+ consumer->getClientFace().expressInterest(*interest, nullptr, nullptr, nullptr);
+ this->advanceClocks(10_ms, 100_ms);
+ BOOST_CHECK_EQUAL(linkAB->getFace(nodeA).getCounters().nOutInterests, 1);
+
+ // The third interest should again go via AD, since both the face from A is already used
+ // and so asf should choose the earliest used face i.e. AD
+ interest->refreshNonce();
+ consumer->getClientFace().expressInterest(*interest, nullptr, nullptr, nullptr);
+ this->advanceClocks(10_ms, 100_ms);
+ BOOST_CHECK_EQUAL(linkAD->getFace(nodeA).getCounters().nOutInterests, 2);
+
+ this->advanceClocks(time::milliseconds(500), time::seconds(5));
+ BOOST_CHECK_EQUAL(linkAD->getFace(nodeA).getCounters().nInData, 1);
+ BOOST_CHECK_EQUAL(linkAB->getFace(nodeA).getCounters().nInData, 1);
+ BOOST_CHECK_EQUAL(consumer->getForwarderFace().getCounters().nOutData, 1);
+
+}
+
+BOOST_AUTO_TEST_CASE(Retransmission)
+{
+ // This is a unit test written to recreate the following issue
+ // https://redmine.named-data.net/issues/4874
+ /*
+ * +---------+ 10ms +---------+
+ * | nodeB | ------> | nodeC |
+ * +---------+ +---------+
+ */
+ // Avoid clearing pit entry for those incoming interest that have pit entry but no next hops
+
+ static const Name PRODUCER_PREFIX = "ndn:/pnr/C";
+ Name parameters = AsfStrategy::getStrategyName();
+ TopologyTester topo;
+
+ TopologyNode nodeB = topo.addForwarder("B"),
+ nodeC = topo.addForwarder("C");
+
+ topo.setStrategy<AsfStrategy>(nodeB, Name("ndn:/"), parameters);
+ topo.setStrategy<AsfStrategy>(nodeC, Name("ndn:/"), parameters);
+
+ shared_ptr<TopologyLink> linkBC = topo.addLink("BC", time::milliseconds(10), {nodeB, nodeC});
+
+ shared_ptr<TopologyAppLink> consumer = topo.addAppFace("c", nodeB),
+ producer = topo.addAppFace("p", nodeC, PRODUCER_PREFIX);
+
+ topo.addEchoProducer(producer->getClientFace(), PRODUCER_PREFIX, 100_ms);
+
+ Name name(PRODUCER_PREFIX);
+ name.appendTimestamp();
+ auto interest = makeInterest(name, true);
+
+ nfd::pit::Pit& pit = topo.getForwarder(nodeB).getPit();
+ shared_ptr<pit::Entry> pitEntry = pit.insert(*interest).first;
+
+ topo.getForwarder(nodeB).onOutgoingInterest(pitEntry, FaceEndpoint(linkBC->getFace(nodeB), 0), *interest);
+ this->advanceClocks(time::milliseconds(100));
+
+ interest->refreshNonce();
+ consumer->getClientFace().expressInterest(*interest, nullptr, nullptr, nullptr);
+ this->advanceClocks(time::milliseconds(100));
+
+ pit::OutRecordCollection::const_iterator outRecord = pitEntry->getOutRecord(linkBC->getFace(nodeB), 0);
+ BOOST_CHECK(outRecord != pitEntry->out_end());
+
+ this->advanceClocks(time::milliseconds(100));
+ BOOST_CHECK_EQUAL(linkBC->getFace(nodeC).getCounters().nOutData, 1);
+ BOOST_CHECK_EQUAL(linkBC->getFace(nodeB).getCounters().nInData, 1);
+}
+
BOOST_AUTO_TEST_CASE(NoPitOutRecordAndProbeInterestNewNonce)
{
/* +---------+
diff --git a/tests/daemon/fw/topology-tester.cpp b/tests/daemon/fw/topology-tester.cpp
index 0e19b57..50d3de5 100644
--- a/tests/daemon/fw/topology-tester.cpp
+++ b/tests/daemon/fw/topology-tester.cpp
@@ -252,13 +252,23 @@
}
void
-TopologyTester::addEchoProducer(ndn::Face& face, const Name& prefix)
+TopologyTester::addEchoProducer(ndn::Face& face, const Name& prefix, time::nanoseconds replyDelay)
{
- face.setInterestFilter(prefix,
- [&face] (const auto&, const Interest& interest) {
- auto data = makeData(interest.getName());
+ BOOST_ASSERT(replyDelay >= 0_ns);
+
+ face.setInterestFilter(prefix, [=, &face] (const auto&, const auto& interest) {
+ auto data = makeData(interest.getName());
+ if (replyDelay == 0_ns) {
+ // reply immediately
+ face.put(*data);
+ }
+ else {
+ // delay the reply
+ getScheduler().schedule(replyDelay, [&face, data = std::move(data)] {
face.put(*data);
});
+ }
+ });
}
void
diff --git a/tests/daemon/fw/topology-tester.hpp b/tests/daemon/fw/topology-tester.hpp
index 2c8d7d0..da4e488 100644
--- a/tests/daemon/fw/topology-tester.hpp
+++ b/tests/daemon/fw/topology-tester.hpp
@@ -286,7 +286,7 @@
/** \brief creates a producer application that answers every Interest with Data of same Name
*/
void
- addEchoProducer(ndn::Face& face, const Name& prefix = "/");
+ addEchoProducer(ndn::Face& face, const Name& prefix = "/", time::nanoseconds replyDelay = 0_ns);
/** \brief creates a consumer application that sends \p n Interests under \p prefix
* at \p interval fixed rate.