face: send Nack after every InterestCallback has responded
refs #4228
Change-Id: I3b1ef58c70d34c2099249216a3600efe01b79f71
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index 53eb4c0..57ee911 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -91,9 +91,10 @@
this->ensureConnected(true);
const Interest& interest2 = *interest;
- auto entry = m_pendingInterestTable.insert(make_shared<PendingInterest>(
+ auto i = m_pendingInterestTable.insert(make_shared<PendingInterest>(
std::move(interest), afterSatisfied, afterNacked, afterTimeout, ref(m_scheduler))).first;
- (*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
+ PendingInterest& entry = **i;
+ entry.setDeleter([this, i] { m_pendingInterestTable.erase(i); });
lp::Packet lpPacket;
addFieldFromTag<lp::NextHopFaceIdField, lp::NextHopFaceIdTag>(lpPacket, interest2);
@@ -101,6 +102,7 @@
m_face.m_transport->send(finishEncoding(std::move(lpPacket), interest2.wireEncode(),
'I', interest2.getName()));
+ entry.recordForwarding();
}
void
@@ -121,20 +123,19 @@
satisfyPendingInterests(const Data& data)
{
bool hasAppMatch = false, hasForwarderMatch = false;
- for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
- if (!(*entry)->getInterest()->matchesData(data)) {
- ++entry;
+ for (auto i = m_pendingInterestTable.begin(); i != m_pendingInterestTable.end(); ) {
+ shared_ptr<PendingInterest> entry = *i;
+ if (!entry->getInterest()->matchesData(data)) {
+ ++i;
continue;
}
- shared_ptr<PendingInterest> matchedEntry = *entry;
- NDN_LOG_DEBUG(" satisfying " << *matchedEntry->getInterest() <<
- " from " << matchedEntry->getOrigin());
- entry = m_pendingInterestTable.erase(entry);
+ NDN_LOG_DEBUG(" satisfying " << *entry->getInterest() << " from " << entry->getOrigin());
+ i = m_pendingInterestTable.erase(i);
- if (matchedEntry->getOrigin() == PendingInterestOrigin::APP) {
+ if (entry->getOrigin() == PendingInterestOrigin::APP) {
hasAppMatch = true;
- matchedEntry->invokeDataCallback(data);
+ entry->invokeDataCallback(data);
}
else {
hasForwarderMatch = true;
@@ -144,33 +145,39 @@
return hasForwarderMatch || !hasAppMatch;
}
- /** @return whether the Nack should be sent to the forwarder, if it does not come from the forwarder
+ /** @return a Nack to be sent to the forwarder, or nullopt if no Nack should be sent
*/
- bool
+ optional<lp::Nack>
nackPendingInterests(const lp::Nack& nack)
{
- bool shouldSendToForwarder = false;
- for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
- if (!nack.getInterest().matchesInterest(*(*entry)->getInterest())) {
- ++entry;
+ optional<lp::Nack> outNack;
+ for (auto i = m_pendingInterestTable.begin(); i != m_pendingInterestTable.end(); ) {
+ shared_ptr<PendingInterest> entry = *i;
+ if (!nack.getInterest().matchesInterest(*entry->getInterest())) {
+ ++i;
continue;
}
- shared_ptr<PendingInterest> matchedEntry = *entry;
- NDN_LOG_DEBUG(" nacking " << *matchedEntry->getInterest() <<
- " from " << matchedEntry->getOrigin());
- entry = m_pendingInterestTable.erase(entry);
+ NDN_LOG_DEBUG(" nacking " << *entry->getInterest() << " from " << entry->getOrigin());
- // TODO #4228 record Nack on PendingInterest record, and send Nack only if all InterestFilters have Nacked
+ optional<lp::Nack> outNack1 = entry->recordNack(nack);
+ if (!outNack1) {
+ ++i;
+ continue;
+ }
- if (matchedEntry->getOrigin() == PendingInterestOrigin::APP) {
- matchedEntry->invokeNackCallback(nack);
+ if (entry->getOrigin() == PendingInterestOrigin::APP) {
+ entry->invokeNackCallback(*outNack1);
}
else {
- shouldSendToForwarder = true;
+ outNack = outNack1;
}
+ i = m_pendingInterestTable.erase(i);
}
- return shouldSendToForwarder;
+ // send "least severe" Nack from any PendingInterest record originated from forwarder, because
+ // it is unimportant to consider Nack reason for the unlikely case when forwarder sends multiple
+ // Interests to an app in a short while
+ return outNack;
}
public: // producer
@@ -197,15 +204,16 @@
processIncomingInterest(shared_ptr<const Interest> interest)
{
const Interest& interest2 = *interest;
- auto entry = m_pendingInterestTable.insert(make_shared<PendingInterest>(
+ auto i = m_pendingInterestTable.insert(make_shared<PendingInterest>(
std::move(interest), ref(m_scheduler))).first;
- (*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
+ PendingInterest& entry = **i;
+ entry.setDeleter([this, i] { m_pendingInterestTable.erase(i); });
for (const auto& filter : m_interestFilterTable) {
if (filter->doesMatch(interest2.getName())) {
NDN_LOG_DEBUG(" matches " << filter->getFilter());
filter->invokeInterestCallback(interest2);
- // TODO #4228 record number of matched InterestFilters on PendingInterest record
+ entry.recordForwarding();
}
}
}
@@ -231,18 +239,18 @@
void
asyncPutNack(const lp::Nack& nack)
{
- bool shouldSendToForwarder = nackPendingInterests(nack);
- if (!shouldSendToForwarder) {
+ optional<lp::Nack> outNack = nackPendingInterests(nack);
+ if (!outNack) {
return;
}
this->ensureConnected(true);
lp::Packet lpPacket;
- lpPacket.add<lp::NackField>(nack.getHeader());
- addFieldFromTag<lp::CongestionMarkField, lp::CongestionMarkTag>(lpPacket, nack);
+ lpPacket.add<lp::NackField>(outNack->getHeader());
+ addFieldFromTag<lp::CongestionMarkField, lp::CongestionMarkTag>(lpPacket, *outNack);
- const Interest& interest = nack.getInterest();
+ const Interest& interest = outNack->getInterest();
m_face.m_transport->send(finishEncoding(std::move(lpPacket), interest.wireEncode(),
'N', interest.getName()));
}
diff --git a/src/detail/pending-interest.hpp b/src/detail/pending-interest.hpp
index 38d7f70..2a4da87 100644
--- a/src/detail/pending-interest.hpp
+++ b/src/detail/pending-interest.hpp
@@ -80,6 +80,7 @@
, m_nackCallback(nackCallback)
, m_timeoutCallback(timeoutCallback)
, m_timeoutEvent(scheduler)
+ , m_nNotNacked(0)
{
scheduleTimeoutEvent(scheduler);
}
@@ -94,6 +95,7 @@
: m_interest(std::move(interest))
, m_origin(PendingInterestOrigin::FORWARDER)
, m_timeoutEvent(scheduler)
+ , m_nNotNacked(0)
{
scheduleTimeoutEvent(scheduler);
}
@@ -114,6 +116,35 @@
}
/**
+ * @brief Record that the Interest has been forwarded to one destination
+ *
+ * A "destination" could be either a local InterestFilter or the forwarder.
+ */
+ void
+ recordForwarding()
+ {
+ ++m_nNotNacked;
+ }
+
+ /**
+ * @brief Record an incoming Nack against a forwarded Interest
+ * @return least severe Nack if all destinations where the Interest was forwarded have Nacked;
+ * otherwise, nullopt
+ */
+ optional<lp::Nack>
+ recordNack(const lp::Nack& nack)
+ {
+ --m_nNotNacked;
+ BOOST_ASSERT(m_nNotNacked >= 0);
+
+ if (!m_leastSevereNack || lp::isLessSevere(nack.getReason(), m_leastSevereNack->getReason())) {
+ m_leastSevereNack = nack;
+ }
+
+ return m_nNotNacked > 0 ? nullopt : m_leastSevereNack;
+ }
+
+ /**
* @brief Invoke the Data callback
* @note This method does nothing if the Data callback is empty
*/
@@ -175,6 +206,8 @@
NackCallback m_nackCallback;
TimeoutCallback m_timeoutCallback;
util::scheduler::ScopedEventId m_timeoutEvent;
+ int m_nNotNacked; ///< number of Interest destinations that have not Nacked
+ optional<lp::Nack> m_leastSevereNack;
std::function<void()> m_deleter;
};
diff --git a/src/lp/nack-header.cpp b/src/lp/nack-header.cpp
index 3bbc985..f289c86 100644
--- a/src/lp/nack-header.cpp
+++ b/src/lp/nack-header.cpp
@@ -46,6 +46,19 @@
return os;
}
+bool
+isLessSevere(lp::NackReason x, lp::NackReason y)
+{
+ if (x == lp::NackReason::NONE) {
+ return false;
+ }
+ if (y == lp::NackReason::NONE) {
+ return true;
+ }
+
+ return static_cast<int>(x) < static_cast<int>(y);
+}
+
NackHeader::NackHeader()
: m_reason(NackReason::NONE)
{
diff --git a/src/lp/nack-header.hpp b/src/lp/nack-header.hpp
index 0c034f4..e4b8c14 100644
--- a/src/lp/nack-header.hpp
+++ b/src/lp/nack-header.hpp
@@ -46,6 +46,13 @@
std::ostream&
operator<<(std::ostream& os, NackReason reason);
+/** \brief compare NackReason for severity
+ *
+ * lp::NackReason::NONE is treated as most severe
+ */
+bool
+isLessSevere(lp::NackReason x, lp::NackReason y);
+
/**
* \brief represents a Network NACK header
*/