fw: Nack in pipelines and best-route strategy
* in PIT out-record, add last incoming Nack field
* create incoming Nack pipeline
* create outgoing Nack pipeline
* modify Interest loop pipeline to send Nack upon duplicate Nonce
* in strategy API, add after receive Nack trigger and send Nack action
* in best-route strategy, send Nack-NoRoute before rejecting pending Interest
* in best-route strategy, process incoming Nack
Other changes include:
* Pit::find
* StrategyTester saved arguments structs
* TopologyTester transmit at Transport level
refs #3156
Change-Id: I7868561c0838231083d471261200aeb280cc6e9d
diff --git a/daemon/fw/best-route-strategy2.cpp b/daemon/fw/best-route-strategy2.cpp
index 2ec786c..e0e31dc 100644
--- a/daemon/fw/best-route-strategy2.cpp
+++ b/daemon/fw/best-route-strategy2.cpp
@@ -31,7 +31,7 @@
NFD_LOG_INIT("BestRouteStrategy2");
-const Name BestRouteStrategy2::STRATEGY_NAME("ndn:/localhost/nfd/strategy/best-route/%FD%03");
+const Name BestRouteStrategy2::STRATEGY_NAME("ndn:/localhost/nfd/strategy/best-route/%FD%04");
NFD_REGISTER_STRATEGY(BestRouteStrategy2);
BestRouteStrategy2::BestRouteStrategy2(Forwarder& forwarder, const Name& name)
@@ -104,8 +104,7 @@
const fib::NextHopList& nexthops = fibEntry->getNextHops();
fib::NextHopList::const_iterator it = nexthops.end();
- RetxSuppression::Result suppression =
- m_retxSuppression.decide(inFace, interest, *pitEntry);
+ RetxSuppression::Result suppression = m_retxSuppression.decide(inFace, interest, *pitEntry);
if (suppression == RetxSuppression::NEW) {
// forward to nexthop with lowest cost except downstream
it = std::find_if(nexthops.begin(), nexthops.end(),
@@ -114,6 +113,11 @@
if (it == nexthops.end()) {
NFD_LOG_DEBUG(interest << " from=" << inFace.getId() << " noNextHop");
+
+ lp::NackHeader nackHeader;
+ nackHeader.setReason(lp::NackReason::NO_ROUTE);
+ this->sendNack(pitEntry, inFace, nackHeader);
+
this->rejectPendingInterest(pitEntry);
return;
}
@@ -156,5 +160,72 @@
}
}
+/** \return less severe NackReason between x and y
+ *
+ * lp::NackReason::NONE is treated as most severe
+ */
+inline lp::NackReason
+compareLessSevere(lp::NackReason x, lp::NackReason y)
+{
+ if (x == lp::NackReason::NONE) {
+ return y;
+ }
+ if (y == lp::NackReason::NONE) {
+ return x;
+ }
+ return static_cast<lp::NackReason>(std::min(static_cast<int>(x), static_cast<int>(y)));
+}
+
+void
+BestRouteStrategy2::afterReceiveNack(const Face& inFace, const lp::Nack& nack,
+ shared_ptr<fib::Entry> fibEntry,
+ shared_ptr<pit::Entry> pitEntry)
+{
+ int nOutRecordsNotNacked = 0;
+ Face* lastFaceNotNacked = nullptr;
+ lp::NackReason leastSevereReason = lp::NackReason::NONE;
+ for (const pit::OutRecord& outR : pitEntry->getOutRecords()) {
+ const lp::NackHeader* inNack = outR.getIncomingNack();
+ if (inNack == nullptr) {
+ ++nOutRecordsNotNacked;
+ lastFaceNotNacked = outR.getFace().get();
+ continue;
+ }
+
+ leastSevereReason = compareLessSevere(leastSevereReason, inNack->getReason());
+ }
+
+ lp::NackHeader outNack;
+ outNack.setReason(leastSevereReason);
+
+ if (nOutRecordsNotNacked == 1) {
+ BOOST_ASSERT(lastFaceNotNacked != nullptr);
+ pit::InRecordCollection::const_iterator inR = pitEntry->getInRecord(*lastFaceNotNacked);
+ if (inR != pitEntry->getInRecords().end()) {
+ // one out-record not Nacked, which is also a downstream
+ NFD_LOG_DEBUG(nack.getInterest() << " nack-from=" << inFace.getId() <<
+ " nack=" << nack.getReason() <<
+ " nack-to(bidirectional)=" << lastFaceNotNacked->getId() <<
+ " out-nack=" << outNack.getReason());
+ this->sendNack(pitEntry, *lastFaceNotNacked, outNack);
+ return;
+ }
+ }
+
+ if (nOutRecordsNotNacked > 0) {
+ NFD_LOG_DEBUG(nack.getInterest() << " nack-from=" << inFace.getId() <<
+ " nack=" << nack.getReason() <<
+ " waiting=" << nOutRecordsNotNacked);
+ // continue waiting
+ return;
+ }
+
+
+ NFD_LOG_DEBUG(nack.getInterest() << " nack-from=" << inFace.getId() <<
+ " nack=" << nack.getReason() <<
+ " nack-to=all out-nack=" << outNack.getReason());
+ this->sendNacks(pitEntry, outNack);
+}
+
} // namespace fw
} // namespace nfd
diff --git a/daemon/fw/best-route-strategy2.hpp b/daemon/fw/best-route-strategy2.hpp
index bf3eba2..8c19dc0 100644
--- a/daemon/fw/best-route-strategy2.hpp
+++ b/daemon/fw/best-route-strategy2.hpp
@@ -32,13 +32,22 @@
namespace nfd {
namespace fw {
-/** \brief Best Route strategy version 3
+/** \brief Best Route strategy version 4
*
* This strategy forwards a new Interest to the lowest-cost nexthop (except downstream).
* After that, if consumer retransmits the Interest (and is not suppressed according to
* exponential backoff algorithm), the strategy forwards the Interest again to
* the lowest-cost nexthop (except downstream) that is not previously used.
- * If all nexthops have been used, the strategy starts over.
+ * If all nexthops have been used, the strategy starts over with the first nexthop.
+ *
+ * This strategy returns Nack to all downstreams with reason NoRoute
+ * if there is no usable nexthop, which may be caused by:
+ * (a) the FIB entry contains no nexthop;
+ * (b) the FIB nexthop happens to be the sole downstream;
+ * (c) the FIB nexthops violate scope.
+ *
+ * This strategy returns Nack to all downstreams if all upstreams have returned Nacks.
+ * The reason of the sent Nack equals the least severe reason among received Nacks.
*/
class BestRouteStrategy2 : public Strategy
{
@@ -46,11 +55,15 @@
BestRouteStrategy2(Forwarder& forwarder, const Name& name = STRATEGY_NAME);
virtual void
- afterReceiveInterest(const Face& inFace,
- const Interest& interest,
+ afterReceiveInterest(const Face& inFace, const Interest& interest,
shared_ptr<fib::Entry> fibEntry,
shared_ptr<pit::Entry> pitEntry) DECL_OVERRIDE;
+ virtual void
+ afterReceiveNack(const Face& inFace, const lp::Nack& nack,
+ shared_ptr<fib::Entry> fibEntry,
+ shared_ptr<pit::Entry> pitEntry) DECL_OVERRIDE;
+
public:
static const Name STRATEGY_NAME;
diff --git a/daemon/fw/face-table.cpp b/daemon/fw/face-table.cpp
index f256d20..1884203 100644
--- a/daemon/fw/face-table.cpp
+++ b/daemon/fw/face-table.cpp
@@ -89,6 +89,8 @@
&m_forwarder, ref(*face), _1));
face->onReceiveData.connect(bind(&Forwarder::startProcessData,
&m_forwarder, ref(*face), _1));
+ face->onReceiveNack.connect(bind(&Forwarder::startProcessNack,
+ &m_forwarder, ref(*face), _1));
face->onFail.connectSingleShot(bind(&FaceTable::remove, this, face, _1));
this->onAdd(face);
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 27bfa38..20428c5 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -60,7 +60,7 @@
interest.getLink();
}
}
- catch (tlv::Error&) {
+ catch (const tlv::Error&) {
NFD_LOG_DEBUG("startProcessInterest face=" << face.getId() <<
" interest=" << interest.getName() << " malformed");
// It's safe to call interest.getName() because Name has been fully parsed
@@ -80,6 +80,25 @@
}
void
+Forwarder::startProcessNack(Face& face, const lp::Nack& nack)
+{
+ // check fields used by forwarding are well-formed
+ try {
+ if (nack.getInterest().hasLink()) {
+ nack.getInterest().getLink();
+ }
+ }
+ catch (const tlv::Error&) {
+ NFD_LOG_DEBUG("startProcessNack face=" << face.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " malformed");
+ return;
+ }
+
+ this->onIncomingNack(face, nack);
+}
+
+void
Forwarder::onIncomingInterest(Face& inFace, const Interest& interest)
{
// receive Interest
@@ -131,10 +150,23 @@
Forwarder::onInterestLoop(Face& inFace, const Interest& interest,
shared_ptr<pit::Entry> pitEntry)
{
- NFD_LOG_DEBUG("onInterestLoop face=" << inFace.getId() <<
- " interest=" << interest.getName());
+ // if multi-access face, drop
+ if (inFace.isMultiAccess()) {
+ NFD_LOG_DEBUG("onInterestLoop face=" << inFace.getId() <<
+ " interest=" << interest.getName() <<
+ " drop");
+ return;
+ }
- // (drop)
+ NFD_LOG_DEBUG("onInterestLoop face=" << inFace.getId() <<
+ " interest=" << interest.getName() <<
+ " send-Nack-duplicate");
+
+ // send Nack with reason=DUPLICATE
+ // note: Don't enter outgoing Nack pipeline because it needs an in-record.
+ lp::Nack nack(interest);
+ nack.setReason(lp::NackReason::DUPLICATE);
+ inFace.sendNack(nack);
}
void
@@ -362,7 +394,7 @@
// CS insert
m_cs.insert(data);
- std::set<shared_ptr<Face> > pendingDownstreams;
+ std::set<Face*> pendingDownstreams;
// foreach PitEntry
for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
@@ -372,10 +404,9 @@
// remember pending downstreams
const pit::InRecordCollection& inRecords = pitEntry->getInRecords();
- for (pit::InRecordCollection::const_iterator it = inRecords.begin();
- it != inRecords.end(); ++it) {
- if (it->getExpiry() > time::steady_clock::now()) {
- pendingDownstreams.insert(it->getFace());
+ for (const pit::InRecord& inRecord : inRecords) {
+ if (inRecord.getExpiry() > time::steady_clock::now()) {
+ pendingDownstreams.insert(inRecord.getFace().get());
}
}
@@ -395,10 +426,8 @@
}
// foreach pending downstream
- for (std::set<shared_ptr<Face> >::iterator it = pendingDownstreams.begin();
- it != pendingDownstreams.end(); ++it) {
- shared_ptr<Face> pendingDownstream = *it;
- if (pendingDownstream.get() == &inFace) {
+ for (Face* pendingDownstream : pendingDownstreams) {
+ if (pendingDownstream == &inFace) {
continue;
}
// goto outgoing Data pipeline
@@ -447,6 +476,104 @@
++m_counters.getNOutDatas();
}
+void
+Forwarder::onIncomingNack(Face& inFace, const lp::Nack& nack)
+{
+ // if multi-access face, drop
+ if (inFace.isMultiAccess()) {
+ NFD_LOG_DEBUG("onIncomingNack face=" << inFace.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " face-is-multi-access");
+ return;
+ }
+
+ // PIT match
+ shared_ptr<pit::Entry> pitEntry = m_pit.find(nack.getInterest());
+ // if no PIT entry found, drop
+ if (pitEntry == nullptr) {
+ NFD_LOG_DEBUG("onIncomingNack face=" << inFace.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " no-PIT-entry");
+ return;
+ }
+
+ // has out-record?
+ pit::OutRecordCollection::iterator outRecord = pitEntry->getOutRecord(inFace);
+ // if no out-record found, drop
+ if (outRecord == pitEntry->getOutRecords().end()) {
+ NFD_LOG_DEBUG("onIncomingNack face=" << inFace.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " no-out-record");
+ return;
+ }
+
+ // if out-record has different Nonce, drop
+ if (nack.getInterest().getNonce() != outRecord->getLastNonce()) {
+ NFD_LOG_DEBUG("onIncomingNack face=" << inFace.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " wrong-Nonce " <<
+ nack.getInterest().getNonce() << "!=" << outRecord->getLastNonce());
+ return;
+ }
+
+ NFD_LOG_DEBUG("onIncomingNack face=" << inFace.getId() <<
+ " nack=" << nack.getInterest().getName() <<
+ "~" << nack.getReason() << " OK");
+
+ // record Nack on out-record
+ outRecord->setIncomingNack(nack);
+
+ // trigger strategy: after receive NACK
+ shared_ptr<fib::Entry> fibEntry = m_fib.findLongestPrefixMatch(*pitEntry);
+ this->dispatchToStrategy(pitEntry, bind(&Strategy::afterReceiveNack, _1,
+ cref(inFace), cref(nack), fibEntry, pitEntry));
+}
+
+void
+Forwarder::onOutgoingNack(shared_ptr<pit::Entry> pitEntry, const Face& outFace,
+ const lp::NackHeader& nack)
+{
+ if (outFace.getId() == INVALID_FACEID) {
+ NFD_LOG_WARN("onOutgoingNack face=invalid" <<
+ " nack=" << pitEntry->getInterest().getName() <<
+ "~" << nack.getReason() << " no-in-record");
+ return;
+ }
+
+ // has in-record?
+ pit::InRecordCollection::const_iterator inRecord = pitEntry->getInRecord(outFace);
+
+ // if no in-record found, drop
+ if (inRecord == pitEntry->getInRecords().end()) {
+ NFD_LOG_DEBUG("onOutgoingNack face=" << outFace.getId() <<
+ " nack=" << pitEntry->getInterest().getName() <<
+ "~" << nack.getReason() << " no-in-record");
+ return;
+ }
+
+ // if multi-access face, drop
+ if (outFace.isMultiAccess()) {
+ NFD_LOG_DEBUG("onOutgoingNack face=" << outFace.getId() <<
+ " nack=" << pitEntry->getInterest().getName() <<
+ "~" << nack.getReason() << " face-is-multi-access");
+ return;
+ }
+
+ NFD_LOG_DEBUG("onOutgoingNack face=" << outFace.getId() <<
+ " nack=" << pitEntry->getInterest().getName() <<
+ "~" << nack.getReason() << " OK");
+
+ // create Nack packet with the Interest from in-record
+ lp::Nack nackPkt(inRecord->getInterest());
+ nackPkt.setHeader(nack);
+
+ // erase in-record
+ pitEntry->deleteInRecord(outFace);
+
+ // send Nack on face
+ const_cast<Face&>(outFace).sendNack(nackPkt);
+}
+
static inline bool
compare_InRecord_expiry(const pit::InRecord& a, const pit::InRecord& b)
{
diff --git a/daemon/fw/forwarder.hpp b/daemon/fw/forwarder.hpp
index 155ec4a..ebfaa05 100644
--- a/daemon/fw/forwarder.hpp
+++ b/daemon/fw/forwarder.hpp
@@ -90,6 +90,12 @@
void
startProcessData(Face& face, const Data& data);
+ /** \brief start incoming Nack processing
+ * \param nack the incoming Nack, must be created with make_shared
+ */
+ void
+ startProcessNack(Face& face, const lp::Nack& nack);
+
NameTree&
getNameTree();
@@ -128,12 +134,12 @@
/** \brief Content Store miss pipeline
*/
- void
+ VIRTUAL_WITH_TESTS void
onContentStoreMiss(const Face& inFace, shared_ptr<pit::Entry> pitEntry, const Interest& interest);
/** \brief Content Store hit pipeline
*/
- void
+ VIRTUAL_WITH_TESTS void
onContentStoreHit(const Face& inFace, shared_ptr<pit::Entry> pitEntry,
const Interest& interest, const Data& data);
@@ -176,6 +182,16 @@
VIRTUAL_WITH_TESTS void
onOutgoingData(const Data& data, Face& outFace);
+ /** \brief incoming Nack pipeline
+ */
+ VIRTUAL_WITH_TESTS void
+ onIncomingNack(Face& inFace, const lp::Nack& nack);
+
+ /** \brief outgoing Nack pipeline
+ */
+ VIRTUAL_WITH_TESTS void
+ onOutgoingNack(shared_ptr<pit::Entry> pitEntry, const Face& outFace, const lp::NackHeader& nack);
+
PROTECTED_WITH_TESTS_ELSE_PRIVATE:
VIRTUAL_WITH_TESTS void
setUnsatisfyTimer(shared_ptr<pit::Entry> pitEntry);
diff --git a/daemon/fw/strategy.cpp b/daemon/fw/strategy.cpp
index 4f439db..7b0504e 100644
--- a/daemon/fw/strategy.cpp
+++ b/daemon/fw/strategy.cpp
@@ -60,23 +60,36 @@
NFD_LOG_DEBUG("beforeExpirePendingInterest pitEntry=" << pitEntry->getName());
}
-//void
-//Strategy::afterAddFibEntry(shared_ptr<fib::Entry> fibEntry)
-//{
-// NFD_LOG_DEBUG("afterAddFibEntry fibEntry=" << fibEntry->getPrefix());
-//}
-//
-//void
-//Strategy::afterUpdateFibEntry(shared_ptr<fib::Entry> fibEntry)
-//{
-// NFD_LOG_DEBUG("afterUpdateFibEntry fibEntry=" << fibEntry->getPrefix());
-//}
-//
-//void
-//Strategy::beforeRemoveFibEntry(shared_ptr<fib::Entry> fibEntry)
-//{
-// NFD_LOG_DEBUG("beforeRemoveFibEntry fibEntry=" << fibEntry->getPrefix());
-//}
+void
+Strategy::afterReceiveNack(const Face& inFace, const lp::Nack& nack,
+ shared_ptr<fib::Entry> fibEntry, shared_ptr<pit::Entry> pitEntry)
+{
+ NFD_LOG_DEBUG("afterReceiveNack inFace=" << inFace.getId() <<
+ " pitEntry=" << pitEntry->getName());
+}
+
+void
+Strategy::sendNacks(shared_ptr<pit::Entry> pitEntry, const lp::NackHeader& header,
+ std::initializer_list<const Face*> exceptFaces)
+{
+ // populate downstreams with all downstreams faces
+ std::unordered_set<const Face*> downstreams;
+ const pit::InRecordCollection& inRecords = pitEntry->getInRecords();
+ std::transform(inRecords.begin(), inRecords.end(), std::inserter(downstreams, downstreams.end()),
+ [] (const pit::InRecord& inR) { return inR.getFace().get(); });
+
+ // delete excluded faces
+ // .erase in a loop is more efficient than std::set_difference between that requires sorted range
+ for (const Face* exceptFace : exceptFaces) {
+ downstreams.erase(exceptFace);
+ }
+
+ // send Nacks
+ for (const Face* downstream : downstreams) {
+ this->sendNack(pitEntry, *downstream, header);
+ }
+ // warning: don't loop on pitEntry->getInRecords(), because InRecord is erased when sending Nack
+}
} // namespace fw
} // namespace nfd
diff --git a/daemon/fw/strategy.hpp b/daemon/fw/strategy.hpp
index 8eca1db..ebbb361 100644
--- a/daemon/fw/strategy.hpp
+++ b/daemon/fw/strategy.hpp
@@ -70,7 +70,7 @@
* invoke this->rejectPendingInterest so that PIT entry will be deleted shortly
*
* \note The strategy is permitted to store a weak reference to fibEntry.
- * Do not store a shared reference, because PIT entry may be deleted at any moment.
+ * Do not store a shared reference, because FIB entry may be deleted at any moment.
* fibEntry is passed by value to allow obtaining a weak reference from it.
* \note The strategy is permitted to store a shared reference to pitEntry.
* pitEntry is passed by value to reflect this fact.
@@ -110,11 +110,33 @@
virtual void
beforeExpirePendingInterest(shared_ptr<pit::Entry> pitEntry);
+ /** \brief trigger after Nack is received
+ *
+ * This trigger is invoked when an incoming Nack is received in response to
+ * an forwarded Interest.
+ * The Nack has been confirmed to be a response to the last Interest forwarded
+ * to that upstream, i.e. the PIT out-record exists and has a matching Nonce.
+ * The NackHeader has been recorded in the PIT out-record.
+ *
+ * In this base class this method does nothing.
+ *
+ * \note The strategy is permitted to store a weak reference to fibEntry.
+ * Do not store a shared reference, because PIT entry may be deleted at any moment.
+ * fibEntry is passed by value to allow obtaining a weak reference from it.
+ * \note The strategy is permitted to store a shared reference to pitEntry.
+ * pitEntry is passed by value to reflect this fact.
+ */
+ virtual void
+ afterReceiveNack(const Face& inFace, const lp::Nack& nack,
+ shared_ptr<fib::Entry> fibEntry, shared_ptr<pit::Entry> pitEntry);
+
protected: // actions
- /// send Interest to outFace
+ /** \brief send Interest to outFace
+ * \param wantNewNonce if true, a new Nonce will be generated,
+ * rather than reusing a Nonce from one of the PIT in-records
+ */
VIRTUAL_WITH_TESTS void
- sendInterest(shared_ptr<pit::Entry> pitEntry,
- shared_ptr<Face> outFace,
+ sendInterest(shared_ptr<pit::Entry> pitEntry, shared_ptr<Face> outFace,
bool wantNewNonce = false);
/** \brief decide that a pending Interest cannot be forwarded
@@ -125,6 +147,22 @@
VIRTUAL_WITH_TESTS void
rejectPendingInterest(shared_ptr<pit::Entry> pitEntry);
+ /** \brief send Nack to outFace
+ *
+ * The outFace must have a PIT in-record, otherwise this method has no effect.
+ */
+ VIRTUAL_WITH_TESTS void
+ sendNack(shared_ptr<pit::Entry> pitEntry, const Face& outFace,
+ const lp::NackHeader& header);
+
+ /** \brief send Nack to every face that has an in-record,
+ * except those in \p exceptFaces
+ * \note This is not an action, but a helper that invokes the sendNack action.
+ */
+ void
+ sendNacks(shared_ptr<pit::Entry> pitEntry, const lp::NackHeader& header,
+ std::initializer_list<const Face*> exceptFaces = std::initializer_list<const Face*>());
+
protected: // accessors
MeasurementsAccessor&
getMeasurements();
@@ -171,6 +209,13 @@
m_forwarder.onInterestReject(pitEntry);
}
+inline void
+Strategy::sendNack(shared_ptr<pit::Entry> pitEntry, const Face& outFace,
+ const lp::NackHeader& header)
+{
+ m_forwarder.onOutgoingNack(pitEntry, outFace, header);
+}
+
inline MeasurementsAccessor&
Strategy::getMeasurements()
{