fw: introduce afterReceiveData strategy trigger
Change-Id: Ia805f1bd48f481c9b903b6ce5c388dd66fbbb7d1
refs: #4290
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 93be2e9..f967e0b 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -269,7 +269,7 @@
// PIT match
pit::DataMatchResult pitMatches = m_pit.findAllDataMatches(data);
- if (pitMatches.begin() == pitMatches.end()) {
+ if (pitMatches.size() == 0) {
// goto Data unsolicited pipeline
this->onDataUnsolicited(inFace, data);
return;
@@ -278,45 +278,73 @@
// CS insert
m_cs.insert(data);
- std::set<Face*> pendingDownstreams;
- // foreach PitEntry
- auto now = time::steady_clock::now();
- for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
- NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
+ // when only one PIT entry is matched, trigger strategy: after receive Data
+ if (pitMatches.size() == 1) {
+ auto& pitEntry = pitMatches.front();
- // remember pending downstreams
- for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
- if (inRecord.getExpiry() > now) {
- pendingDownstreams.insert(&inRecord.getFace());
- }
- }
+ NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
// set PIT expiry timer to now
this->setExpiryTimer(pitEntry, 0_ms);
- // invoke PIT satisfy callback
+ // trigger strategy: after receive Data
this->dispatchToStrategy(*pitEntry,
- [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
+ [&] (fw::Strategy& strategy) { strategy.afterReceiveData(pitEntry, inFace, data); });
+ // mark PIT satisfied
pitEntry->isSatisfied = true;
pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
// Dead Nonce List insert if necessary (for out-record of inFace)
this->insertDeadNonceList(*pitEntry, &inFace);
- // mark PIT satisfied
- pitEntry->clearInRecords();
+ // delete PIT entry's out-record
pitEntry->deleteOutRecord(inFace);
}
+ // when more than one PIT entry is matched, trigger strategy: before satisfy Interest,
+ // and send Data to all matched out faces
+ else {
+ std::set<Face*> pendingDownstreams;
+ auto now = time::steady_clock::now();
- // foreach pending downstream
- for (Face* pendingDownstream : pendingDownstreams) {
- if (pendingDownstream->getId() == inFace.getId() &&
- pendingDownstream->getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
- continue;
+ for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
+ NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
+
+ // remember pending downstreams
+ for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
+ if (inRecord.getExpiry() > now) {
+ pendingDownstreams.insert(&inRecord.getFace());
+ }
+ }
+
+ // set PIT expiry timer to now
+ this->setExpiryTimer(pitEntry, 0_ms);
+
+ // invoke PIT satisfy callback
+ this->dispatchToStrategy(*pitEntry,
+ [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
+
+ // mark PIT satisfied
+ pitEntry->isSatisfied = true;
+ pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
+
+ // Dead Nonce List insert if necessary (for out-record of inFace)
+ this->insertDeadNonceList(*pitEntry, &inFace);
+
+ // clear PIT entry's in and out records
+ pitEntry->clearInRecords();
+ pitEntry->deleteOutRecord(inFace);
}
- // goto outgoing Data pipeline
- this->onOutgoingData(data, *pendingDownstream);
+
+ // foreach pending downstream
+ for (Face* pendingDownstream : pendingDownstreams) {
+ if (pendingDownstream->getId() == inFace.getId() &&
+ pendingDownstream->getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
+ continue;
+ }
+ // goto outgoing Data pipeline
+ this->onOutgoingData(data, *pendingDownstream);
+ }
}
}
diff --git a/daemon/fw/strategy.cpp b/daemon/fw/strategy.cpp
index a05080f..ae1b4bd 100644
--- a/daemon/fw/strategy.cpp
+++ b/daemon/fw/strategy.cpp
@@ -167,6 +167,18 @@
}
void
+Strategy::afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+ const Face& inFace, const Data& data)
+{
+ NFD_LOG_DEBUG("afterReceiveData pitEntry=" << pitEntry->getName() <<
+ " inFace=" << inFace.getId() << " data=" << data.getName());
+
+ this->beforeSatisfyInterest(pitEntry, inFace, data);
+
+ this->sendDataToAll(pitEntry, inFace, data);
+}
+
+void
Strategy::afterReceiveNack(const Face& inFace, const lp::Nack& nack,
const shared_ptr<pit::Entry>& pitEntry)
{
@@ -181,6 +193,40 @@
}
void
+Strategy::sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace)
+{
+ BOOST_ASSERT(pitEntry->getInterest().matchesData(data));
+
+ // delete the PIT entry's in-record based on outFace,
+ // since Data is sent to outFace from which the Interest was received
+ pitEntry->deleteInRecord(outFace);
+
+ m_forwarder.onOutgoingData(data, *const_pointer_cast<Face>(outFace.shared_from_this()));
+}
+
+void
+Strategy::sendDataToAll(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace, const Data& data)
+{
+ std::set<Face*> pendingDownstreams;
+ auto now = time::steady_clock::now();
+
+ // remember pending downstreams
+ for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
+ if (inRecord.getExpiry() > now) {
+ if (inRecord.getFace().getId() == inFace.getId() &&
+ inRecord.getFace().getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
+ continue;
+ }
+ pendingDownstreams.insert(&inRecord.getFace());
+ }
+ }
+
+ for (const Face* pendingDownstream : pendingDownstreams) {
+ this->sendData(pitEntry, data, *pendingDownstream);
+ }
+}
+
+void
Strategy::sendNacks(const shared_ptr<pit::Entry>& pitEntry, const lp::NackHeader& header,
std::initializer_list<const Face*> exceptFaces)
{
diff --git a/daemon/fw/strategy.hpp b/daemon/fw/strategy.hpp
index e770e27..b52c05b 100644
--- a/daemon/fw/strategy.hpp
+++ b/daemon/fw/strategy.hpp
@@ -143,12 +143,15 @@
/** \brief trigger before PIT entry is satisfied
*
- * This trigger is invoked when an incoming Data satisfies the PIT entry.
- * Normally, only the first incoming Data would satisfy the PIT entry and invoke this trigger,
- * after which the PIT entry is erased.
+ * This trigger is invoked when an incoming Data satisfies more than one PIT entry.
+ * The strategy can collect measurements information, but cannot manipulate Data forwarding.
+ * When an incoming Data satisfies only one PIT entry, \c afterReceiveData is invoked instead
+ * and given full control over Data forwarding. If a strategy does not override \c afterReceiveData,
+ * the default implementation invokes \c beforeSatisfyInterest.
*
+ * Normally, PIT entries would be erased after receiving the first matching Data.
* If the strategy wishes to collect responses from additional upstream nodes,
- * it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+ * it should invoke \c setExpiryTimer within this function to prolong the PIT entry lifetime.
* If a Data arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
* At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
*
@@ -169,6 +172,33 @@
afterContentStoreHit(const shared_ptr<pit::Entry>& pitEntry,
const Face& inFace, const Data& data);
+ /** \brief trigger after Data is received
+ *
+ * This trigger is invoked when an incoming Data satisfies exactly one PIT entry,
+ * and gives the strategy full control over Data forwarding.
+ *
+ * When this trigger is invoked:
+ * - The Data has been verified to satisfy the PIT entry.
+ * - The PIT entry expiry timer is set to now
+ *
+ * Within this function:
+ * - A strategy should return Data to downstream nodes via \c sendData or \c sendDataToAll.
+ * - A strategy can modify the Data as long as it still satisfies the PIT entry, such as
+ * adding or removing congestion marks.
+ * - A strategy can delay Data forwarding by prolonging the PIT entry lifetime via \c setExpiryTimer,
+ * and forward Data before the PIT entry is erased.
+ * - A strategy can collect measurements about the upstream.
+ * - A strategy can collect responses from additional upstream nodes by prolonging the PIT entry
+ * lifetime via \c setExpiryTimer every time a Data is received. Note that only one Data should
+ * be returned to each downstream node.
+ *
+ * In the base class this method invokes \c beforeSatisfyInterest trigger and then returns
+ * the Data to downstream faces via \c sendDataToAll.
+ */
+ virtual void
+ afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+ const Face& inFace, const Data& data);
+
/** \brief trigger after Nack is received
*
* This trigger is invoked when an incoming Nack is received in response to
@@ -220,12 +250,18 @@
* \param outFace face through which to send out the Data
*/
VIRTUAL_WITH_TESTS void
- sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace)
- {
- BOOST_ASSERT(pitEntry->getInterest().matchesData(data));
+ sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace);
- m_forwarder.onOutgoingData(data, *const_pointer_cast<Face>(outFace.shared_from_this()));
- }
+ /** \brief send \p data to all matched and qualified faces
+ *
+ * A matched face is qualified if it is ad-hoc or it is NOT \p inFace
+ *
+ * \param pitEntry PIT entry
+ * \param inFace face through which the Data comes from
+ * \param data the Data packet
+ */
+ VIRTUAL_WITH_TESTS void
+ sendDataToAll(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace, const Data& data);
/** \brief schedule the PIT entry for immediate deletion
*
diff --git a/daemon/table/pit.hpp b/daemon/table/pit.hpp
index 91e958e..cfebb0a 100644
--- a/daemon/table/pit.hpp
+++ b/daemon/table/pit.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2016, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -36,8 +36,9 @@
* \brief an unordered iterable of all PIT entries matching Data
*
* This type shall support:
- * iterator<shared_ptr<Entry>> begin()
- * iterator<shared_ptr<Entry>> end()
+ * - `iterator<shared_ptr<Entry>> begin()`
+ * - `iterator<shared_ptr<Entry>> end()`
+ * - `size_t size() const`
*/
typedef std::vector<shared_ptr<Entry>> DataMatchResult;
diff --git a/tests/daemon/fw/dummy-strategy.cpp b/tests/daemon/fw/dummy-strategy.cpp
index b37171f..1dd79dd 100644
--- a/tests/daemon/fw/dummy-strategy.cpp
+++ b/tests/daemon/fw/dummy-strategy.cpp
@@ -47,6 +47,7 @@
, afterReceiveInterest_count(0)
, beforeSatisfyInterest_count(0)
, afterContentStoreHit_count(0)
+ , afterReceiveData_count(0)
, afterReceiveNack_count(0)
{
this->setInstanceName(name);
@@ -79,7 +80,16 @@
{
++afterContentStoreHit_count;
- this->sendData(pitEntry, data, inFace);
+ Strategy::afterContentStoreHit(pitEntry, inFace, data);
+}
+
+void
+DummyStrategy::afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+ const Face& inFace, const Data& data)
+{
+ ++afterReceiveData_count;
+
+ Strategy::afterReceiveData(pitEntry, inFace, data);
}
void
diff --git a/tests/daemon/fw/dummy-strategy.hpp b/tests/daemon/fw/dummy-strategy.hpp
index 24a432c..697d5ae 100644
--- a/tests/daemon/fw/dummy-strategy.hpp
+++ b/tests/daemon/fw/dummy-strategy.hpp
@@ -73,6 +73,10 @@
const Face& inFace, const Data& data) override;
void
+ afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+ const Face& inFace, const Data& data) override;
+
+ void
afterReceiveNack(const Face& inFace, const lp::Nack& nack,
const shared_ptr<pit::Entry>& pitEntry) override;
@@ -93,6 +97,7 @@
int afterReceiveInterest_count;
int beforeSatisfyInterest_count;
int afterContentStoreHit_count;
+ int afterReceiveData_count;
int afterReceiveNack_count;
shared_ptr<Face> interestOutFace;
diff --git a/tests/daemon/fw/pit-expiry.t.cpp b/tests/daemon/fw/pit-expiry.t.cpp
index 1977458..07c79c9 100644
--- a/tests/daemon/fw/pit-expiry.t.cpp
+++ b/tests/daemon/fw/pit-expiry.t.cpp
@@ -78,7 +78,7 @@
{
DummyStrategy::beforeSatisfyInterest(pitEntry, inFace, data);
- if (beforeSatisfyInterest_count <= 1) {
+ if (beforeSatisfyInterest_count <= 2) {
setExpiryTimer(pitEntry, 190_ms);
}
}
@@ -95,6 +95,19 @@
}
void
+ afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+ const Face& inFace, const Data& data) override
+ {
+ ++afterReceiveData_count;
+
+ if (afterReceiveData_count <= 2) {
+ setExpiryTimer(pitEntry, 290_ms);
+ }
+
+ this->sendDataToAll(pitEntry, inFace, data);
+ }
+
+ void
afterReceiveNack(const Face& inFace, const lp::Nack& nack,
const shared_ptr<pit::Entry>& pitEntry) override
{
@@ -259,12 +272,70 @@
auto face1 = make_shared<DummyFace>();
auto face2 = make_shared<DummyFace>();
+ auto face3 = make_shared<DummyFace>();
+ forwarder.addFace(face1);
+ forwarder.addFace(face2);
+ forwarder.addFace(face3);
+
+ Name strategyA("/strategyA/%FD%01");
+ Name strategyB("/strategyB/%FD%01");
+ PitExpiryTestStrategy::registerAs(strategyA);
+ PitExpiryTestStrategy::registerAs(strategyB);
+ auto& sA = choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
+ auto& sB = choose<PitExpiryTestStrategy>(forwarder, "/A/0", strategyB);
+ Pit& pit = forwarder.getPit();
+
+ shared_ptr<Interest> interest1 = makeInterest("/A");
+ shared_ptr<Interest> interest2 = makeInterest("/A/0");
+ interest1->setInterestLifetime(90_ms);
+ interest2->setInterestLifetime(90_ms);
+ shared_ptr<Data> data = makeData("/A/0");
+
+ face1->receiveInterest(*interest1);
+ face2->receiveInterest(*interest2);
+ BOOST_CHECK_EQUAL(pit.size(), 2);
+
+ // beforeSatisfyInterest: the first Data prolongs PIT expiry timer by 190 ms
+ this->advanceClocks(30_ms);
+ face3->receiveData(*data);
+ this->advanceClocks(189_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 2);
+ this->advanceClocks(2_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 0);
+
+ face1->receiveInterest(*interest1);
+ face2->receiveInterest(*interest2);
+
+ // beforeSatisfyInterest: the second Data prolongs PIT expiry timer
+ // and the third one sets the timer to now
+ this->advanceClocks(30_ms);
+ face3->receiveData(*data);
+ this->advanceClocks(1_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 2);
+
+ this->advanceClocks(30_ms);
+ face3->receiveData(*data);
+ this->advanceClocks(1_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 0);
+
+ BOOST_CHECK_EQUAL(sA.beforeSatisfyInterest_count, 3);
+ BOOST_CHECK_EQUAL(sB.beforeSatisfyInterest_count, 3);
+ BOOST_CHECK_EQUAL(sA.afterReceiveData_count, 0);
+ BOOST_CHECK_EQUAL(sB.afterReceiveData_count, 0);
+}
+
+BOOST_AUTO_TEST_CASE(ResetTimerAfterReceiveData)
+{
+ Forwarder forwarder;
+
+ auto face1 = make_shared<DummyFace>();
+ auto face2 = make_shared<DummyFace>();
forwarder.addFace(face1);
forwarder.addFace(face2);
Name strategyA("/strategyA/%FD%01");
PitExpiryTestStrategy::registerAs(strategyA);
- choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
+ auto& sA = choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
Pit& pit = forwarder.getPit();
@@ -274,17 +345,30 @@
face1->receiveInterest(*interest);
+ // afterReceiveData: the first Data prolongs PIT expiry timer by 290 ms
this->advanceClocks(30_ms);
face2->receiveData(*data);
+ this->advanceClocks(289_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 1);
+ this->advanceClocks(2_ms);
+ BOOST_CHECK_EQUAL(pit.size(), 0);
+ face1->receiveInterest(*interest);
+
+ // afterReceiveData: the second Data prolongs PIT expiry timer
+ // and the third one sets the timer to now
+ this->advanceClocks(30_ms);
+ face2->receiveData(*data);
this->advanceClocks(1_ms);
BOOST_CHECK_EQUAL(pit.size(), 1);
this->advanceClocks(30_ms);
face2->receiveData(*data);
-
this->advanceClocks(1_ms);
BOOST_CHECK_EQUAL(pit.size(), 0);
+
+ BOOST_CHECK_EQUAL(sA.beforeSatisfyInterest_count, 0);
+ BOOST_CHECK_EQUAL(sA.afterReceiveData_count, 3);
}
BOOST_AUTO_TEST_CASE(ReceiveNackAfterResetTimer)