fw: introduce afterReceiveData strategy trigger

Change-Id: Ia805f1bd48f481c9b903b6ce5c388dd66fbbb7d1
refs: #4290
diff --git a/daemon/fw/forwarder.cpp b/daemon/fw/forwarder.cpp
index 93be2e9..f967e0b 100644
--- a/daemon/fw/forwarder.cpp
+++ b/daemon/fw/forwarder.cpp
@@ -269,7 +269,7 @@
 
   // PIT match
   pit::DataMatchResult pitMatches = m_pit.findAllDataMatches(data);
-  if (pitMatches.begin() == pitMatches.end()) {
+  if (pitMatches.size() == 0) {
     // goto Data unsolicited pipeline
     this->onDataUnsolicited(inFace, data);
     return;
@@ -278,45 +278,73 @@
   // CS insert
   m_cs.insert(data);
 
-  std::set<Face*> pendingDownstreams;
-  // foreach PitEntry
-  auto now = time::steady_clock::now();
-  for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
-    NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
+  // when only one PIT entry is matched, trigger strategy: after receive Data
+  if (pitMatches.size() == 1) {
+    auto& pitEntry = pitMatches.front();
 
-    // remember pending downstreams
-    for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
-      if (inRecord.getExpiry() > now) {
-        pendingDownstreams.insert(&inRecord.getFace());
-      }
-    }
+    NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
 
     // set PIT expiry timer to now
     this->setExpiryTimer(pitEntry, 0_ms);
 
-    // invoke PIT satisfy callback
+    // trigger strategy: after receive Data
     this->dispatchToStrategy(*pitEntry,
-      [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
+      [&] (fw::Strategy& strategy) { strategy.afterReceiveData(pitEntry, inFace, data); });
 
+    // mark PIT satisfied
     pitEntry->isSatisfied = true;
     pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
 
     // Dead Nonce List insert if necessary (for out-record of inFace)
     this->insertDeadNonceList(*pitEntry, &inFace);
 
-    // mark PIT satisfied
-    pitEntry->clearInRecords();
+    // delete PIT entry's out-record
     pitEntry->deleteOutRecord(inFace);
   }
+  // when more than one PIT entry is matched, trigger strategy: before satisfy Interest,
+  // and send Data to all matched out faces
+  else {
+    std::set<Face*> pendingDownstreams;
+    auto now = time::steady_clock::now();
 
-  // foreach pending downstream
-  for (Face* pendingDownstream : pendingDownstreams) {
-    if (pendingDownstream->getId() == inFace.getId() &&
-        pendingDownstream->getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
-      continue;
+    for (const shared_ptr<pit::Entry>& pitEntry : pitMatches) {
+      NFD_LOG_DEBUG("onIncomingData matching=" << pitEntry->getName());
+
+      // remember pending downstreams
+      for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
+        if (inRecord.getExpiry() > now) {
+          pendingDownstreams.insert(&inRecord.getFace());
+        }
+      }
+
+      // set PIT expiry timer to now
+      this->setExpiryTimer(pitEntry, 0_ms);
+
+      // invoke PIT satisfy callback
+      this->dispatchToStrategy(*pitEntry,
+        [&] (fw::Strategy& strategy) { strategy.beforeSatisfyInterest(pitEntry, inFace, data); });
+
+      // mark PIT satisfied
+      pitEntry->isSatisfied = true;
+      pitEntry->dataFreshnessPeriod = data.getFreshnessPeriod();
+
+      // Dead Nonce List insert if necessary (for out-record of inFace)
+      this->insertDeadNonceList(*pitEntry, &inFace);
+
+      // clear PIT entry's in and out records
+      pitEntry->clearInRecords();
+      pitEntry->deleteOutRecord(inFace);
     }
-    // goto outgoing Data pipeline
-    this->onOutgoingData(data, *pendingDownstream);
+
+    // foreach pending downstream
+    for (Face* pendingDownstream : pendingDownstreams) {
+      if (pendingDownstream->getId() == inFace.getId() &&
+          pendingDownstream->getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
+        continue;
+      }
+      // goto outgoing Data pipeline
+      this->onOutgoingData(data, *pendingDownstream);
+    }
   }
 }
 
diff --git a/daemon/fw/strategy.cpp b/daemon/fw/strategy.cpp
index a05080f..ae1b4bd 100644
--- a/daemon/fw/strategy.cpp
+++ b/daemon/fw/strategy.cpp
@@ -167,6 +167,18 @@
 }
 
 void
+Strategy::afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+                           const Face& inFace, const Data& data)
+{
+  NFD_LOG_DEBUG("afterReceiveData pitEntry=" << pitEntry->getName() <<
+                " inFace=" << inFace.getId() << " data=" << data.getName());
+
+  this->beforeSatisfyInterest(pitEntry, inFace, data);
+
+  this->sendDataToAll(pitEntry, inFace, data);
+}
+
+void
 Strategy::afterReceiveNack(const Face& inFace, const lp::Nack& nack,
                            const shared_ptr<pit::Entry>& pitEntry)
 {
@@ -181,6 +193,40 @@
 }
 
 void
+Strategy::sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace)
+{
+  BOOST_ASSERT(pitEntry->getInterest().matchesData(data));
+
+  // delete the PIT entry's in-record based on outFace,
+  // since Data is sent to outFace from which the Interest was received
+  pitEntry->deleteInRecord(outFace);
+
+  m_forwarder.onOutgoingData(data, *const_pointer_cast<Face>(outFace.shared_from_this()));
+}
+
+void
+Strategy::sendDataToAll(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace, const Data& data)
+{
+  std::set<Face*> pendingDownstreams;
+  auto now = time::steady_clock::now();
+
+  // remember pending downstreams
+  for (const pit::InRecord& inRecord : pitEntry->getInRecords()) {
+    if (inRecord.getExpiry() > now) {
+      if (inRecord.getFace().getId() == inFace.getId() &&
+          inRecord.getFace().getLinkType() != ndn::nfd::LINK_TYPE_AD_HOC) {
+        continue;
+      }
+      pendingDownstreams.insert(&inRecord.getFace());
+    }
+  }
+
+  for (const Face* pendingDownstream : pendingDownstreams) {
+    this->sendData(pitEntry, data, *pendingDownstream);
+  }
+}
+
+void
 Strategy::sendNacks(const shared_ptr<pit::Entry>& pitEntry, const lp::NackHeader& header,
                     std::initializer_list<const Face*> exceptFaces)
 {
diff --git a/daemon/fw/strategy.hpp b/daemon/fw/strategy.hpp
index e770e27..b52c05b 100644
--- a/daemon/fw/strategy.hpp
+++ b/daemon/fw/strategy.hpp
@@ -143,12 +143,15 @@
 
   /** \brief trigger before PIT entry is satisfied
    *
-   *  This trigger is invoked when an incoming Data satisfies the PIT entry.
-   *  Normally, only the first incoming Data would satisfy the PIT entry and invoke this trigger,
-   *  after which the PIT entry is erased.
+   *  This trigger is invoked when an incoming Data satisfies more than one PIT entry.
+   *  The strategy can collect measurements information, but cannot manipulate Data forwarding.
+   *  When an incoming Data satisfies only one PIT entry, \c afterReceiveData is invoked instead
+   *  and given full control over Data forwarding. If a strategy does not override \c afterReceiveData,
+   *  the default implementation invokes \c beforeSatisfyInterest.
    *
+   *  Normally, PIT entries would be erased after receiving the first matching Data.
    *  If the strategy wishes to collect responses from additional upstream nodes,
-   *  it should invoke \c setExpiryTimer within this function to retain the PIT entry.
+   *  it should invoke \c setExpiryTimer within this function to prolong the PIT entry lifetime.
    *  If a Data arrives from another upstream during the extended PIT entry lifetime, this trigger will be invoked again.
    *  At that time, this function must invoke \c setExpiryTimer again to continue collecting more responses.
    *
@@ -169,6 +172,33 @@
   afterContentStoreHit(const shared_ptr<pit::Entry>& pitEntry,
                        const Face& inFace, const Data& data);
 
+  /** \brief trigger after Data is received
+   *
+   *  This trigger is invoked when an incoming Data satisfies exactly one PIT entry,
+   *  and gives the strategy full control over Data forwarding.
+   *
+   *  When this trigger is invoked:
+   *  - The Data has been verified to satisfy the PIT entry.
+   *  - The PIT entry expiry timer is set to now
+   *
+   *  Within this function:
+   *  - A strategy should return Data to downstream nodes via \c sendData or \c sendDataToAll.
+   *  - A strategy can modify the Data as long as it still satisfies the PIT entry, such as
+   *    adding or removing congestion marks.
+   *  - A strategy can delay Data forwarding by prolonging the PIT entry lifetime via \c setExpiryTimer,
+   *    and forward Data before the PIT entry is erased.
+   *  - A strategy can collect measurements about the upstream.
+   *  - A strategy can collect responses from additional upstream nodes by prolonging the PIT entry
+   *    lifetime via \c setExpiryTimer every time a Data is received. Note that only one Data should
+   *    be returned to each downstream node.
+   *
+   *  In the base class this method invokes \c beforeSatisfyInterest trigger and then returns
+   *  the Data to downstream faces via \c sendDataToAll.
+   */
+  virtual void
+  afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+                   const Face& inFace, const Data& data);
+
   /** \brief trigger after Nack is received
    *
    *  This trigger is invoked when an incoming Nack is received in response to
@@ -220,12 +250,18 @@
    *  \param outFace face through which to send out the Data
    */
   VIRTUAL_WITH_TESTS void
-  sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace)
-  {
-    BOOST_ASSERT(pitEntry->getInterest().matchesData(data));
+  sendData(const shared_ptr<pit::Entry>& pitEntry, const Data& data, const Face& outFace);
 
-    m_forwarder.onOutgoingData(data, *const_pointer_cast<Face>(outFace.shared_from_this()));
-  }
+  /** \brief send \p data to all matched and qualified faces
+   *
+   *  A matched face is qualified if it is ad-hoc or it is NOT \p inFace
+   *
+   *  \param pitEntry PIT entry
+   *  \param inFace face through which the Data comes from
+   *  \param data the Data packet
+   */
+  VIRTUAL_WITH_TESTS void
+  sendDataToAll(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace, const Data& data);
 
   /** \brief schedule the PIT entry for immediate deletion
    *
diff --git a/daemon/table/pit.hpp b/daemon/table/pit.hpp
index 91e958e..cfebb0a 100644
--- a/daemon/table/pit.hpp
+++ b/daemon/table/pit.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2016,  Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -36,8 +36,9 @@
  *  \brief an unordered iterable of all PIT entries matching Data
  *
  *  This type shall support:
- *    iterator<shared_ptr<Entry>> begin()
- *    iterator<shared_ptr<Entry>> end()
+ *  - `iterator<shared_ptr<Entry>> begin()`
+ *  - `iterator<shared_ptr<Entry>> end()`
+ *  - `size_t size() const`
  */
 typedef std::vector<shared_ptr<Entry>> DataMatchResult;
 
diff --git a/tests/daemon/fw/dummy-strategy.cpp b/tests/daemon/fw/dummy-strategy.cpp
index b37171f..1dd79dd 100644
--- a/tests/daemon/fw/dummy-strategy.cpp
+++ b/tests/daemon/fw/dummy-strategy.cpp
@@ -47,6 +47,7 @@
   , afterReceiveInterest_count(0)
   , beforeSatisfyInterest_count(0)
   , afterContentStoreHit_count(0)
+  , afterReceiveData_count(0)
   , afterReceiveNack_count(0)
 {
   this->setInstanceName(name);
@@ -79,7 +80,16 @@
 {
   ++afterContentStoreHit_count;
 
-  this->sendData(pitEntry, data, inFace);
+  Strategy::afterContentStoreHit(pitEntry, inFace, data);
+}
+
+void
+DummyStrategy::afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+                                const Face& inFace, const Data& data)
+{
+  ++afterReceiveData_count;
+
+  Strategy::afterReceiveData(pitEntry, inFace, data);
 }
 
 void
diff --git a/tests/daemon/fw/dummy-strategy.hpp b/tests/daemon/fw/dummy-strategy.hpp
index 24a432c..697d5ae 100644
--- a/tests/daemon/fw/dummy-strategy.hpp
+++ b/tests/daemon/fw/dummy-strategy.hpp
@@ -73,6 +73,10 @@
                        const Face& inFace, const Data& data) override;
 
   void
+  afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+                   const Face& inFace, const Data& data) override;
+
+  void
   afterReceiveNack(const Face& inFace, const lp::Nack& nack,
                    const shared_ptr<pit::Entry>& pitEntry) override;
 
@@ -93,6 +97,7 @@
   int afterReceiveInterest_count;
   int beforeSatisfyInterest_count;
   int afterContentStoreHit_count;
+  int afterReceiveData_count;
   int afterReceiveNack_count;
 
   shared_ptr<Face> interestOutFace;
diff --git a/tests/daemon/fw/pit-expiry.t.cpp b/tests/daemon/fw/pit-expiry.t.cpp
index 1977458..07c79c9 100644
--- a/tests/daemon/fw/pit-expiry.t.cpp
+++ b/tests/daemon/fw/pit-expiry.t.cpp
@@ -78,7 +78,7 @@
   {
     DummyStrategy::beforeSatisfyInterest(pitEntry, inFace, data);
 
-    if (beforeSatisfyInterest_count <= 1) {
+    if (beforeSatisfyInterest_count <= 2) {
       setExpiryTimer(pitEntry, 190_ms);
     }
   }
@@ -95,6 +95,19 @@
   }
 
   void
+  afterReceiveData(const shared_ptr<pit::Entry>& pitEntry,
+                   const Face& inFace, const Data& data) override
+  {
+    ++afterReceiveData_count;
+
+    if (afterReceiveData_count <= 2) {
+      setExpiryTimer(pitEntry, 290_ms);
+    }
+
+    this->sendDataToAll(pitEntry, inFace, data);
+  }
+
+  void
   afterReceiveNack(const Face& inFace, const lp::Nack& nack,
                    const shared_ptr<pit::Entry>& pitEntry) override
   {
@@ -259,12 +272,70 @@
 
   auto face1 = make_shared<DummyFace>();
   auto face2 = make_shared<DummyFace>();
+  auto face3 = make_shared<DummyFace>();
+  forwarder.addFace(face1);
+  forwarder.addFace(face2);
+  forwarder.addFace(face3);
+
+  Name strategyA("/strategyA/%FD%01");
+  Name strategyB("/strategyB/%FD%01");
+  PitExpiryTestStrategy::registerAs(strategyA);
+  PitExpiryTestStrategy::registerAs(strategyB);
+  auto& sA = choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
+  auto& sB = choose<PitExpiryTestStrategy>(forwarder, "/A/0", strategyB);
+  Pit& pit = forwarder.getPit();
+
+  shared_ptr<Interest> interest1 = makeInterest("/A");
+  shared_ptr<Interest> interest2 = makeInterest("/A/0");
+  interest1->setInterestLifetime(90_ms);
+  interest2->setInterestLifetime(90_ms);
+  shared_ptr<Data> data = makeData("/A/0");
+
+  face1->receiveInterest(*interest1);
+  face2->receiveInterest(*interest2);
+  BOOST_CHECK_EQUAL(pit.size(), 2);
+
+  // beforeSatisfyInterest: the first Data prolongs PIT expiry timer by 190 ms
+  this->advanceClocks(30_ms);
+  face3->receiveData(*data);
+  this->advanceClocks(189_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 2);
+  this->advanceClocks(2_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 0);
+
+  face1->receiveInterest(*interest1);
+  face2->receiveInterest(*interest2);
+
+  // beforeSatisfyInterest: the second Data prolongs PIT expiry timer
+  // and the third one sets the timer to now
+  this->advanceClocks(30_ms);
+  face3->receiveData(*data);
+  this->advanceClocks(1_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 2);
+
+  this->advanceClocks(30_ms);
+  face3->receiveData(*data);
+  this->advanceClocks(1_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 0);
+
+  BOOST_CHECK_EQUAL(sA.beforeSatisfyInterest_count, 3);
+  BOOST_CHECK_EQUAL(sB.beforeSatisfyInterest_count, 3);
+  BOOST_CHECK_EQUAL(sA.afterReceiveData_count, 0);
+  BOOST_CHECK_EQUAL(sB.afterReceiveData_count, 0);
+}
+
+BOOST_AUTO_TEST_CASE(ResetTimerAfterReceiveData)
+{
+  Forwarder forwarder;
+
+  auto face1 = make_shared<DummyFace>();
+  auto face2 = make_shared<DummyFace>();
   forwarder.addFace(face1);
   forwarder.addFace(face2);
 
   Name strategyA("/strategyA/%FD%01");
   PitExpiryTestStrategy::registerAs(strategyA);
-  choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
+  auto& sA = choose<PitExpiryTestStrategy>(forwarder, "/A", strategyA);
 
   Pit& pit = forwarder.getPit();
 
@@ -274,17 +345,30 @@
 
   face1->receiveInterest(*interest);
 
+  // afterReceiveData: the first Data prolongs PIT expiry timer by 290 ms
   this->advanceClocks(30_ms);
   face2->receiveData(*data);
+  this->advanceClocks(289_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 1);
+  this->advanceClocks(2_ms);
+  BOOST_CHECK_EQUAL(pit.size(), 0);
 
+  face1->receiveInterest(*interest);
+
+  // afterReceiveData: the second Data prolongs PIT expiry timer
+  // and the third one sets the timer to now
+  this->advanceClocks(30_ms);
+  face2->receiveData(*data);
   this->advanceClocks(1_ms);
   BOOST_CHECK_EQUAL(pit.size(), 1);
 
   this->advanceClocks(30_ms);
   face2->receiveData(*data);
-
   this->advanceClocks(1_ms);
   BOOST_CHECK_EQUAL(pit.size(), 0);
+
+  BOOST_CHECK_EQUAL(sA.beforeSatisfyInterest_count, 0);
+  BOOST_CHECK_EQUAL(sA.afterReceiveData_count, 3);
 }
 
 BOOST_AUTO_TEST_CASE(ReceiveNackAfterResetTimer)