chunks: consolidate accounting of received data into onData()

Change-Id: Ib47381d3f69b482d60af8b07d002135944b17604
diff --git a/tests/chunks/pipeline-interests-aimd.t.cpp b/tests/chunks/pipeline-interests-aimd.t.cpp
index ba729ab..e7856c4 100644
--- a/tests/chunks/pipeline-interests-aimd.t.cpp
+++ b/tests/chunks/pipeline-interests-aimd.t.cpp
@@ -1,5 +1,5 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
+/*
  * Copyright (c) 2016-2017, Regents of the University of California,
  *                          Colorado State University,
  *                          University Pierre & Marie Curie, Sorbonne University.
@@ -92,14 +92,16 @@
   double preCwnd = aimdPipeline->m_cwnd;
   runWithData(*makeDataWithSegment(0));
   advanceClocks(io, time::nanoseconds(1));
-  BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 1);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
 
   for (uint64_t i = 1; i < nDataSegments - 1; ++i) {
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1));
-    BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd - preCwnd, 1, 0.1);
+    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, 1, 0.1);
     preCwnd = aimdPipeline->m_cwnd;
   }
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments - 1);
 }
 
 BOOST_AUTO_TEST_CASE(CongestionAvoidance)
@@ -111,7 +113,7 @@
   double preCwnd = aimdPipeline->m_cwnd;
   runWithData(*makeDataWithSegment(0));
   advanceClocks(io, time::nanoseconds(1));
-  BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 1);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
 
   for (uint64_t i = 1; i < aimdPipeline->m_ssthresh; ++i) { // slow start
     face.receive(*makeDataWithSegment(i));
@@ -119,14 +121,16 @@
     preCwnd = aimdPipeline->m_cwnd;
   }
 
-  BOOST_REQUIRE_CLOSE(preCwnd, aimdPipeline->m_ssthresh, 0.1);
+  BOOST_CHECK_CLOSE(preCwnd, aimdPipeline->m_ssthresh, 0.1);
 
   for (uint64_t i = aimdPipeline->m_ssthresh; i < nDataSegments - 1; ++i) { // congestion avoidance
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1));
-    BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd - preCwnd, opt.aiStep / floor(aimdPipeline->m_cwnd), 0.1);
+    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, opt.aiStep / floor(aimdPipeline->m_cwnd), 0.1);
     preCwnd = aimdPipeline->m_cwnd;
   }
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments - 1);
 }
 
 BOOST_AUTO_TEST_CASE(Timeout)
@@ -137,7 +141,7 @@
 
   runWithData(*makeDataWithSegment(0));
   advanceClocks(io, time::nanoseconds(1));
-  BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 1);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
 
   // receive segment 1 and segment 2
   for (uint64_t i = 1; i < 3; ++i) {
@@ -145,8 +149,9 @@
     advanceClocks(io, time::nanoseconds(1));
   }
 
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 3, 0.1);
-  BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 5); // request for segment 5 has been sent
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 3);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 3, 0.1);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 5); // request for segment 5 has been sent
 
   advanceClocks(io, time::milliseconds(100));
 
@@ -158,20 +163,23 @@
   face.receive(*makeDataWithSegment(5));
   advanceClocks(io, time::nanoseconds(1));
 
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 4.25, 0.1);
-  BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 7); // all the segment requests have been sent
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.25, 0.1);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 7); // all the segment requests have been sent
 
   // timeout segment 3
   advanceClocks(io, time::milliseconds(150));
 
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 2.125, 0.1); // window size drop to 1/2 of previous size
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, 0.1); // window size drop to 1/2 of previous size
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 1);
 
   // receive segment 6, retransmit 3
   face.receive(*makeDataWithSegment(6));
   advanceClocks(io, time::nanoseconds(1));
 
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 2.625, 0.1); // congestion avoidance
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 6);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.625, 0.1); // congestion avoidance
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[3], 1);
 }
@@ -185,6 +193,8 @@
 
   face.receive(*makeDataWithSegment(1));
   advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 2);
   BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 4);
 
   // receive a nack with NackReason::DUPLICATE for segment 2
@@ -194,6 +204,7 @@
 
   // nack1 is ignored
   BOOST_CHECK_EQUAL(hasFailed, false);
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 2);
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
 
   // receive a nack with NackReason::CONGESTION for segment 3
@@ -211,6 +222,7 @@
 
   // Other types of Nack will trigger a failure
   BOOST_CHECK_EQUAL(hasFailed, true);
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 2);
 }
 
 BOOST_AUTO_TEST_CASE(FinalBlockIdNotSetAtBeginning)
@@ -226,6 +238,7 @@
 
   // interests for segment 1 - 6 have been sent
   BOOST_CHECK_EQUAL(face.sentInterests.size(), 6);
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 2);
   BOOST_CHECK_EQUAL(aimdPipeline->m_hasFinalBlockId, false);
   // pending interests: segment 2, 3, 4, 5, 6
   BOOST_CHECK_EQUAL(face.getNPendingInterests(), 5);
@@ -233,6 +246,7 @@
   // receive segment 2 with FinalBlockId
   face.receive(*makeDataWithSegment(2));
   advanceClocks(io, time::nanoseconds(1));
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 3);
   BOOST_CHECK_EQUAL(aimdPipeline->m_hasFinalBlockId, true);
 
   // pending interests for segment 2, 4, 5, 6 haven been removed
diff --git a/tests/chunks/pipeline-interests-fixed-window.t.cpp b/tests/chunks/pipeline-interests-fixed-window.t.cpp
index e9012f7..ef3cfc5 100644
--- a/tests/chunks/pipeline-interests-fixed-window.t.cpp
+++ b/tests/chunks/pipeline-interests-fixed-window.t.cpp
@@ -1,5 +1,5 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
+/*
  * Copyright (c) 2016-2017, Regents of the University of California,
  *                          Colorado State University,
  *                          University Pierre & Marie Curie, Sorbonne University.
@@ -44,6 +44,7 @@
   {
     setPipeline(make_unique<PipelineInterestsFixedWindow>(face, PipelineInterestsFixedWindow::Options(opt)));
   }
+
 protected:
   Options opt;
 
@@ -61,9 +62,9 @@
 };
 
 BOOST_AUTO_TEST_SUITE(Chunks)
-BOOST_AUTO_TEST_SUITE(TestPipelineInterestsFixedWindow)
+BOOST_FIXTURE_TEST_SUITE(TestPipelineInterestsFixedWindow, PipelineInterestFixedWindowFixture)
 
-BOOST_FIXTURE_TEST_CASE(FewerSegmentsThanPipelineCapacity, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(FewerSegmentsThanPipelineCapacity)
 {
   nDataSegments = 3;
   BOOST_ASSERT(nDataSegments <= opt.maxPipelineSize);
@@ -72,11 +73,11 @@
   advanceClocks(io, time::nanoseconds(1), 1);
   BOOST_REQUIRE_EQUAL(face.sentInterests.size(), nDataSegments - 1);
 
-  for (uint64_t i = 0; i < nDataSegments - 1; ++i) {
+  for (uint64_t i = 1; i < nDataSegments - 1; ++i) {
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1), 1);
 
-    BOOST_CHECK_EQUAL(nReceivedSegments, i);
+    BOOST_CHECK_EQUAL(pipeline->m_nReceived, i + 1);
     BOOST_REQUIRE_EQUAL(face.sentInterests.size(), nDataSegments - 1);
     // check if the interest for the segment i+1 is well formed
     const auto& sentInterest = face.sentInterests[i];
@@ -93,7 +94,7 @@
   BOOST_CHECK_EQUAL(hasFailed, true);
 }
 
-BOOST_FIXTURE_TEST_CASE(FullPipeline, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(FullPipeline)
 {
   nDataSegments = 13;
   BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
@@ -105,7 +106,7 @@
   for (uint64_t i = 0; i < nDataSegments - 1; ++i) {
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1), 1);
-    BOOST_CHECK_EQUAL(nReceivedSegments, i + 1);
+    BOOST_CHECK_EQUAL(pipeline->m_nReceived, i + 2);
 
     if (i < nDataSegments - opt.maxPipelineSize - 1) {
       BOOST_REQUIRE_EQUAL(face.sentInterests.size(), opt.maxPipelineSize + i + 1);
@@ -126,7 +127,7 @@
   BOOST_CHECK_EQUAL(hasFailed, false);
 }
 
-BOOST_FIXTURE_TEST_CASE(TimeoutAllSegments, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(TimeoutAllSegments)
 {
   nDataSegments = 13;
   BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
@@ -138,7 +139,7 @@
   for (int i = 0; i < opt.maxRetriesOnTimeoutOrNack; ++i) {
     advanceClocks(io, opt.interestLifetime, 1);
     BOOST_REQUIRE_EQUAL(face.sentInterests.size(), opt.maxPipelineSize * (i + 2));
-    BOOST_CHECK_EQUAL(nReceivedSegments, 0);
+    BOOST_CHECK_EQUAL(pipeline->m_nReceived, 1);
 
     // A single retry for every pipeline element
     for (size_t j = 0; j < opt.maxPipelineSize; ++j) {
@@ -151,7 +152,7 @@
   BOOST_CHECK_EQUAL(hasFailed, true);
 }
 
-BOOST_FIXTURE_TEST_CASE(TimeoutAfterFinalBlockIdReceived, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(TimeoutAfterFinalBlockIdReceived)
 {
   // the FinalBlockId is sent with the first segment, after the first segment failure the pipeline
   // should fail
@@ -196,7 +197,7 @@
   BOOST_CHECK_EQUAL(face.getNPendingInterests(), 0);
 }
 
-BOOST_FIXTURE_TEST_CASE(TimeoutBeforeFinalBlockIdReceived, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(TimeoutBeforeFinalBlockIdReceived)
 {
   // the FinalBlockId is sent only with the last segment, all segments are sent except for the
   // second one (segment #1); all segments are received correctly until the FinalBlockId is received
@@ -245,11 +246,11 @@
   // timeout for the second pipeline element (segment #1), this should trigger an error
   advanceClocks(io, opt.interestLifetime, 1);
 
-  BOOST_CHECK_EQUAL(nReceivedSegments, nDataSegments - 2);
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments - 1);
   BOOST_CHECK_EQUAL(hasFailed, true);
 }
 
-BOOST_FIXTURE_TEST_CASE(SegmentReceivedAfterTimeout, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(SegmentReceivedAfterTimeout)
 {
   // the FinalBlockId is never sent, all the pipeline elements with a segment number greater than
   // segment #0 will fail, after this failure also segment #0 fail and this should trigger an error
@@ -278,11 +279,11 @@
   face.receive(*makeDataWithSegment(0, false));
   advanceClocks(io, time::nanoseconds(1), 1);
 
-  BOOST_CHECK_EQUAL(nReceivedSegments, 1);
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 2);
   BOOST_CHECK_EQUAL(hasFailed, true);
 }
 
-BOOST_FIXTURE_TEST_CASE(CongestionAllSegments, PipelineInterestFixedWindowFixture)
+BOOST_AUTO_TEST_CASE(CongestionAllSegments)
 {
   nDataSegments = 13;
   BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
diff --git a/tests/chunks/pipeline-interests-fixture.hpp b/tests/chunks/pipeline-interests-fixture.hpp
index 8da4dc1..e35aec1 100644
--- a/tests/chunks/pipeline-interests-fixture.hpp
+++ b/tests/chunks/pipeline-interests-fixture.hpp
@@ -1,8 +1,8 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2016,  Regents of the University of California,
- *                      Colorado State University,
- *                      University Pierre & Marie Curie, Sorbonne University.
+/*
+ * Copyright (c) 2016-2017, Regents of the University of California,
+ *                          Colorado State University,
+ *                          University Pierre & Marie Curie, Sorbonne University.
  *
  * This file is part of ndn-tools (Named Data Networking Essential Tools).
  * See AUTHORS.md for complete list of ndn-tools authors and contributors.
@@ -21,6 +21,7 @@
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
  *
  * @author Andrea Tosatto
+ * @author Davide Pesavento
  * @author Weiwei Liu
  */
 
@@ -30,6 +31,7 @@
 #include "tools/chunks/catchunks/pipeline-interests.hpp"
 
 #include "tests/test-common.hpp"
+
 #include <ndn-cxx/util/dummy-client-face.hpp>
 
 namespace ndn {
@@ -45,7 +47,6 @@
     : face(io)
     , name("/ndn/chunks/test")
     , nDataSegments(0)
-    , nReceivedSegments(0)
     , hasFailed(false)
   {
   }
@@ -70,21 +71,8 @@
   runWithData(const Data& data)
   {
     pipeline->run(data,
-                  bind(&PipelineInterestsFixture::onData, this, _1, _2),
-                  bind(&PipelineInterestsFixture::onFailure, this, _1));
-  }
-
-private:
-  void
-  onData(const Interest& interest, const Data& data)
-  {
-    nReceivedSegments++;
-  }
-
-  void
-  onFailure(const std::string& reason)
-  {
-    hasFailed = true;
+                  [this] (const Data&) {},
+                  [this] (const std::string&) { hasFailed = true; });
   }
 
 protected:
@@ -93,7 +81,6 @@
   Name name;
   unique_ptr<PipelineInterests> pipeline;
   uint64_t nDataSegments;
-  uint64_t nReceivedSegments;
   bool hasFailed;
 };
 
diff --git a/tools/chunks/catchunks/consumer.cpp b/tools/chunks/catchunks/consumer.cpp
index 6132d82..ce01583 100644
--- a/tools/chunks/catchunks/consumer.cpp
+++ b/tools/chunks/catchunks/consumer.cpp
@@ -46,7 +46,11 @@
   m_nextToPrint = 0;
   m_bufferedData.clear();
 
-  m_discover->onDiscoverySuccess.connect(bind(&Consumer::startPipeline, this, _1));
+  m_discover->onDiscoverySuccess.connect([this] (const Data& data) {
+    m_pipeline->run(data,
+      [this] (const Data& data) { handleData(data); },
+      [] (const std::string& msg) { BOOST_THROW_EXCEPTION(std::runtime_error(msg)); });
+  });
   m_discover->onDiscoveryFailure.connect([] (const std::string& msg) {
     BOOST_THROW_EXCEPTION(std::runtime_error(msg));
   });
@@ -54,16 +58,6 @@
 }
 
 void
-Consumer::startPipeline(const Data& data)
-{
-  this->handleData(data);
-
-  m_pipeline->run(data,
-    [this] (const Interest&, const Data& data) { this->handleData(data); },
-    [] (const std::string& msg) { BOOST_THROW_EXCEPTION(std::runtime_error(msg)); });
-}
-
-void
 Consumer::handleData(const Data& data)
 {
   auto dataPtr = data.shared_from_this();
@@ -82,7 +76,7 @@
       m_bufferedData[getSegmentFromPacket(data)] = dataPtr;
       writeInOrderData();
     },
-    [] (const Data& data, const security::v2::ValidationError& error) {
+    [] (const Data&, const security::v2::ValidationError& error) {
       BOOST_THROW_EXCEPTION(DataValidationError(error));
     });
 }
diff --git a/tools/chunks/catchunks/consumer.hpp b/tools/chunks/catchunks/consumer.hpp
index 3dcefe6..633d05b 100644
--- a/tools/chunks/catchunks/consumer.hpp
+++ b/tools/chunks/catchunks/consumer.hpp
@@ -1,5 +1,5 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
+/*
  * Copyright (c) 2016-2017, Regents of the University of California,
  *                          Colorado State University,
  *                          University Pierre & Marie Curie, Sorbonne University.
@@ -80,9 +80,6 @@
 
 private:
   void
-  startPipeline(const Data& data);
-
-  void
   handleData(const Data& data);
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index 3b35001..b979a51 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -154,9 +154,7 @@
   auto interestId = m_face.expressInterest(interest,
                                            bind(&PipelineInterestsAimd::handleData, this, _1, _2),
                                            bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
-                                           bind(&PipelineInterestsAimd::handleLifetimeExpiration,
-                                                this, _1));
-
+                                           bind(&PipelineInterestsAimd::handleLifetimeExpiration, this, _1));
   m_nInFlight++;
 
   if (isRetransmission) {
@@ -246,11 +244,8 @@
     m_nInFlight--;
   }
 
-  m_nReceived++;
-  m_receivedSize += data.getContent().value_size();
-
   increaseWindow();
-  onData(interest, data);
+  onData(data);
 
   if (segInfo.state == SegmentState::FirstTimeSent ||
       segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
index b81ec97..40a4417 100644
--- a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
@@ -119,10 +119,7 @@
   if (m_options.isVerbose)
     std::cerr << "Received segment #" << getSegmentFromPacket(data) << std::endl;
 
-  m_nReceived++;
-  m_receivedSize += data.getContent().value_size();
-
-  onData(interest, data);
+  onData(data);
 
   if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
     m_lastSegmentNo = data.getFinalBlockId().toSegment();
diff --git a/tools/chunks/catchunks/pipeline-interests.cpp b/tools/chunks/catchunks/pipeline-interests.cpp
index c0512cc..e520bfe 100644
--- a/tools/chunks/catchunks/pipeline-interests.cpp
+++ b/tools/chunks/catchunks/pipeline-interests.cpp
@@ -35,10 +35,10 @@
 
 PipelineInterests::PipelineInterests(Face& face)
   : m_face(face)
+  , m_hasFinalBlockId(false)
   , m_lastSegmentNo(0)
   , m_nReceived(0)
   , m_receivedSize(0)
-  , m_hasFinalBlockId(false)
   , m_nextSegmentNo(0)
   , m_excludedSegmentNo(0)
   , m_isStopping(false)
@@ -48,11 +48,11 @@
 PipelineInterests::~PipelineInterests() = default;
 
 void
-PipelineInterests::run(const Data& data, DataCallback onData, FailureCallback onFailure)
+PipelineInterests::run(const Data& data, DataCallback dataCb, FailureCallback failureCb)
 {
-  BOOST_ASSERT(onData != nullptr);
-  m_onData = std::move(onData);
-  m_onFailure = std::move(onFailure);
+  BOOST_ASSERT(dataCb != nullptr);
+  m_onData = std::move(dataCb);
+  m_onFailure = std::move(failureCb);
   m_prefix = data.getName().getPrefix(-1);
   m_excludedSegmentNo = getSegmentFromPacket(data);
 
@@ -61,11 +61,11 @@
     m_hasFinalBlockId = true;
   }
 
+  onData(data);
+
   // record the start time of the pipeline
   m_startTime = time::steady_clock::now();
 
-  m_nReceived++;
-  m_receivedSize += data.getContent().value_size();
   doRun();
 }
 
@@ -90,6 +90,15 @@
 }
 
 void
+PipelineInterests::onData(const Data& data)
+{
+  m_nReceived++;
+  m_receivedSize += data.getContent().value_size();
+
+  m_onData(data);
+}
+
+void
 PipelineInterests::onFailure(const std::string& reason)
 {
   if (m_isStopping)
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index c8e14fe..90fd76a 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -49,14 +49,14 @@
 class PipelineInterests
 {
 public:
-  typedef function<void(const std::string& reason)> FailureCallback;
+  using DataCallback = function<void(const Data&)>;
+  using FailureCallback = function<void(const std::string& reason)>;
 
-public:
   /**
    * @brief create a PipelineInterests service
    *
-   * Configures the pipelining service without specifying the retrieval namespace. After this
-   * configuration the method run must be called to start the Pipeline.
+   * Configures the pipelining service without specifying the retrieval namespace.
+   * After construction, the method run() must be called in order to start the pipeline.
    */
   explicit
   PipelineInterests(Face& face);
@@ -100,11 +100,11 @@
   uint64_t
   getNextSegmentNo();
 
+  /**
+   * @brief subclasses must call this method to notify successful retrieval of a segment
+   */
   void
-  onData(const Interest& interest, const Data& data) const
-  {
-    m_onData(interest, data);
-  }
+  onData(const Data& data);
 
   /**
    * @brief subclasses can call this method to signal an unrecoverable failure
@@ -131,7 +131,7 @@
    * @brief perform subclass-specific operations to fetch all the segments
    *
    * When overriding this function, at a minimum, the subclass should implement the retrieving
-   * of all the segments. Subclass must guarantee that onData is called at least once for every
+   * of all the segments. Subclass must guarantee that `onData` is called once for every
    * segment that is fetched successfully.
    *
    * @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
@@ -145,12 +145,12 @@
 protected:
   Face& m_face;
   Name m_prefix;
-  uint64_t m_lastSegmentNo; ///< valid only if m_hasFinalBlockId == true
-  int64_t m_nReceived; ///< number of segments received
-  size_t m_receivedSize; ///< size of received data in bytes
 
 PUBLIC_WITH_TESTS_ELSE_PROTECTED:
-  bool m_hasFinalBlockId; ///< true if the last segment number is known
+  bool m_hasFinalBlockId;   ///< true if the last segment number is known
+  uint64_t m_lastSegmentNo; ///< valid only if m_hasFinalBlockId == true
+  int64_t m_nReceived;      ///< number of segments received
+  size_t m_receivedSize;    ///< size of received data in bytes
 
 private:
   DataCallback m_onData;