util: add "in order" mode to SegmentFetcher

This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.

Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/AUTHORS.md b/AUTHORS.md
index 261d9b6..13919c0 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -10,6 +10,7 @@
 * Hila Ben Abraham <https://sites.wustl.edu/hbabraham>
 * Shuo Chen <https://github.com/chenatu>
 * Muktadir R. Chowdhury <https://github.com/alvyC>
+* Jeremy Clark <https://github.com/jrandallclark>
 * Damian Coomes <https://github.com/dmcoomes>
 * Stephanie DiBenedetto
 * Qiuhan Ding <https://irl.cs.ucla.edu/~qiuhanding>
diff --git a/ndn-cxx/util/segment-fetcher.cpp b/ndn-cxx/util/segment-fetcher.cpp
index 6e5d492..0da0860 100644
--- a/ndn-cxx/util/segment-fetcher.cpp
+++ b/ndn-cxx/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2019 Regents of the University of California,
+ * Copyright (c) 2013-2020 Regents of the University of California,
  *                         Colorado State University,
  *                         University Pierre & Marie Curie, Sorbonne University.
  *
@@ -19,10 +19,6 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
  */
 
 #include "ndn-cxx/util/segment-fetcher.hpp"
@@ -71,16 +67,8 @@
   , m_validator(validator)
   , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
   , m_timeLastSegmentReceived(time::steady_clock::now())
-  , m_nextSegmentNum(0)
   , m_cwnd(options.initCwnd)
   , m_ssthresh(options.initSsthresh)
-  , m_nSegmentsInFlight(0)
-  , m_nSegments(0)
-  , m_highInterest(0)
-  , m_highData(0)
-  , m_recPoint(0)
-  , m_nReceived(0)
-  , m_nBytesReceived(0)
 {
   m_options.validate();
 }
@@ -137,7 +125,15 @@
     return finalizeFetch();
   }
 
-  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+  int64_t availableWindowSize;
+  if (m_options.inOrder) {
+    availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
+  }
+  else {
+    availableWindowSize = static_cast<int64_t>(m_cwnd);
+  }
+  availableWindowSize -= m_nSegmentsInFlight;
+
   std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
 
   while (availableWindowSize > 0) {
@@ -152,7 +148,7 @@
       segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
     }
     else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
-      if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+      if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
         // Don't request a segment a second time if received in response to first "discovery" Interest
         m_nextSegmentNum++;
         continue;
@@ -264,6 +260,8 @@
 
   // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
   uint64_t currentSegment = data.getName().get(-1).toSegment();
+  m_receivedSegments.insert(currentSegment);
+
   // Add measurement to RTO estimator (if not retransmission)
   if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
     BOOST_ASSERT(m_nSegmentsInFlight >= 0);
@@ -275,9 +273,9 @@
   m_pendingSegments.erase(pendingSegmentIt);
 
   // Copy data in segment to temporary buffer
-  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
-                                                      std::forward_as_tuple(currentSegment),
-                                                      std::forward_as_tuple(data.getContent().value_size()));
+  auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
+                                                   std::forward_as_tuple(currentSegment),
+                                                   std::forward_as_tuple(data.getContent().value_size()));
   std::copy(data.getContent().value_begin(), data.getContent().value_end(),
             receivedSegmentIt.first->second.begin());
   m_nBytesReceived += data.getContent().value_size();
@@ -295,6 +293,13 @@
     }
   }
 
+  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
+    do {
+      onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
+      m_segmentBuffer.erase(m_nextSegmentInOrder++);
+    } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
+  }
+
   if (m_receivedSegments.size() == 1) {
     m_versionedDataName = data.getName().getPrefix(-1);
     if (currentSegment == 0) {
@@ -405,16 +410,20 @@
 void
 SegmentFetcher::finalizeFetch()
 {
-  // Combine segments into final buffer
-  OBufferStream buf;
-  // We may have received more segments than exist in the object.
-  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
-
-  for (int64_t i = 0; i < m_nSegments; i++) {
-    buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+  if (m_options.inOrder) {
+    onInOrderComplete();
   }
+  else {
+    // Combine segments into final buffer
+    OBufferStream buf;
+    // We may have received more segments than exist in the object.
+    BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
 
-  onComplete(buf.buf());
+    for (int64_t i = 0; i < m_nSegments; i++) {
+      buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
+    }
+    onComplete(buf.buf());
+  }
   stop();
 }
 
diff --git a/ndn-cxx/util/segment-fetcher.hpp b/ndn-cxx/util/segment-fetcher.hpp
index bf349d0..30879d6 100644
--- a/ndn-cxx/util/segment-fetcher.hpp
+++ b/ndn-cxx/util/segment-fetcher.hpp
@@ -19,10 +19,6 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
  */
 
 #ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -35,6 +31,7 @@
 #include "ndn-cxx/util/signal.hpp"
 
 #include <queue>
+#include <set>
 
 namespace ndn {
 namespace util {
@@ -53,20 +50,21 @@
  *
  * 1. Express an Interest to discover the latest version of the object:
  *
- *    Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
+ *    Interest: `/<prefix>?CanBePrefix&MustBeFresh`
  *
- * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
+ * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`.
  *
  * 3. Keep sending Interests for future segments until an error occurs or the number of segments
  *    indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
  *    at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
  *    otherwise, retrieval will start at segment 0. By default, congestion control will be used to
  *    manage the Interest window size. Interests expressed in this step will follow this Name
- *    format:
+ *    format: `/<prefix>/<version>/<segment=(N)>`.
  *
- *    Interest: `/<prefix>/<version>/<segment=(N)>`
- *
- * 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
+ * 4. If set to 'block' mode, signal #onComplete passing a memory buffer that combines the content
+ *    of all segments in the object. If set to 'in order' mode, signal #onInOrderData is triggered
+ *    upon validation of each segment in segment order, storing later segments that arrived out of
+ *    order internally until all earlier segments have arrived and have been validated.
  *
  * If an error occurs during the fetching process, #onError is signaled with one of the error codes
  * from SegmentFetcher::ErrorCode.
@@ -124,18 +122,20 @@
     validate();
 
   public:
-    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
-    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
-    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
     time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+    bool inOrder = false; ///< true for 'in order' mode, false for 'block' mode
+    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+    bool disableCwa = false; ///< disable Conservative Window Adaptation
+    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     double initCwnd = 1.0; ///< initial congestion window size
     double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
     double aiStep = 1.0; ///< additive increase step (in segments)
     double mdCoef = 0.5; ///< multiplicative decrease coefficient
-    bool disableCwa = false; ///< disable Conservative Window Adaptation
-    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
-    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     RttEstimator::Options rttOptions; ///< options for RTT estimator
+    size_t flowControlWindow = 25000; ///< maximum number of segments stored in the reorder buffer
   };
 
   /**
@@ -244,37 +244,50 @@
 
 public:
   /**
-   * @brief Emits upon successful retrieval of the complete data.
+   * @brief Emitted upon successful retrieval of the complete object (all segments).
+   * @note Emitted only if SegmentFetcher is operating in 'block' mode.
    */
   Signal<SegmentFetcher, ConstBufferPtr> onComplete;
 
   /**
-   * @brief Emits when the retrieval could not be completed due to an error.
+   * @brief Emitted when the retrieval could not be completed due to an error.
    *
    * Handlers are provided with an error code and a string error message.
    */
   Signal<SegmentFetcher, uint32_t, std::string> onError;
 
   /**
-   * @brief Emits whenever a data segment received.
+   * @brief Emitted whenever a data segment received.
    */
   Signal<SegmentFetcher, Data> afterSegmentReceived;
 
   /**
-   * @brief Emits whenever a received data segment has been successfully validated.
+   * @brief Emitted whenever a received data segment has been successfully validated.
    */
   Signal<SegmentFetcher, Data> afterSegmentValidated;
 
   /**
-   * @brief Emits whenever an Interest for a data segment is nacked.
+   * @brief Emitted whenever an Interest for a data segment is nacked.
    */
   Signal<SegmentFetcher> afterSegmentNacked;
 
   /**
-   * @brief Emits whenever an Interest for a data segment times out.
+   * @brief Emitted whenever an Interest for a data segment times out.
    */
   Signal<SegmentFetcher> afterSegmentTimedOut;
 
+  /**
+   * @brief Emitted after each data segment in segment order has been validated.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher, ConstBufferPtr> onInOrderData;
+
+  /**
+   * @brief Emitted on successful retrieval of all segments in 'in order' mode.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher> onInOrderComplete;
+
 private:
   enum class SegmentState {
     FirstInterest, ///< the first Interest for this segment has been sent
@@ -305,19 +318,21 @@
   time::steady_clock::TimePoint m_timeLastSegmentReceived;
   std::queue<uint64_t> m_retxQueue;
   Name m_versionedDataName;
-  uint64_t m_nextSegmentNum;
+  uint64_t m_nextSegmentNum = 0;
   double m_cwnd;
   double m_ssthresh;
-  int64_t m_nSegmentsInFlight;
-  int64_t m_nSegments;
-  uint64_t m_highInterest;
-  uint64_t m_highData;
-  uint64_t m_recPoint;
-  int64_t m_nReceived;
-  int64_t m_nBytesReceived;
+  int64_t m_nSegmentsInFlight = 0;
+  int64_t m_nSegments = 0;
+  uint64_t m_highInterest = 0;
+  uint64_t m_highData = 0;
+  uint64_t m_recPoint = 0;
+  int64_t m_nReceived = 0;
+  int64_t m_nBytesReceived = 0;
+  uint64_t m_nextSegmentInOrder = 0;
 
-  std::map<uint64_t, Buffer> m_receivedSegments;
+  std::map<uint64_t, Buffer> m_segmentBuffer;
   std::map<uint64_t, PendingSegment> m_pendingSegments;
+  std::set<uint64_t> m_receivedSegments;
 };
 
 } // namespace util
diff --git a/tests/unit/util/segment-fetcher.t.cpp b/tests/unit/util/segment-fetcher.t.cpp
index 9f65f49..7f63db8 100644
--- a/tests/unit/util/segment-fetcher.t.cpp
+++ b/tests/unit/util/segment-fetcher.t.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2019 Regents of the University of California.
+ * Copyright (c) 2013-2020 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -73,7 +73,20 @@
   {
     ++nCompletions;
     dataSize = data->size();
-    dataBuf = data;
+  }
+
+  void
+  onInOrderComplete()
+  {
+    ++nCompletions;
+    ++nOnInOrderComplete;
+  }
+
+  void
+  onInOrderData(ConstBufferPtr data)
+  {
+    ++nOnInOrderData;
+    dataSize += data->size();
   }
 
   void
@@ -88,8 +101,15 @@
   void
   connectSignals(const shared_ptr<SegmentFetcher>& fetcher)
   {
+    fetcher->onInOrderData.connect(bind(&Fixture::onInOrderData, this, _1));
+    fetcher->onInOrderComplete.connect(bind(&Fixture::onInOrderComplete, this));
     fetcher->onComplete.connect(bind(&Fixture::onComplete, this, _1));
     fetcher->onError.connect(bind(&Fixture::onError, this, _1));
+
+    fetcher->afterSegmentReceived.connect([this] (const auto&) { ++this->nAfterSegmentReceived; });
+    fetcher->afterSegmentValidated.connect([this] (const auto &) { ++this->nAfterSegmentValidated; });
+    fetcher->afterSegmentNacked.connect([this] { ++nAfterSegmentNacked; });
+    fetcher->afterSegmentTimedOut.connect([this] { ++nAfterSegmentTimedOut; });
   }
 
   void
@@ -141,7 +161,12 @@
   uint32_t lastError = 0;
   int nCompletions = 0;
   size_t dataSize = 0;
-  ConstBufferPtr dataBuf;
+  size_t nAfterSegmentReceived = 0;
+  size_t nAfterSegmentValidated = 0;
+  size_t nAfterSegmentNacked = 0;
+  size_t nAfterSegmentTimedOut = 0;
+  size_t nOnInOrderData = 0;
+  size_t nOnInOrderComplete = 0;
 
   // number of segments in fetched object
   uint64_t nSegments = 0;
@@ -200,17 +225,9 @@
 BOOST_AUTO_TEST_CASE(BasicSingleSegment)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -231,17 +248,9 @@
   SegmentFetcher::Options options;
   options.useConstantCwnd = true;
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator, options);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -304,21 +313,13 @@
 BOOST_AUTO_TEST_CASE(BasicMultipleSegments)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   sendNackInsteadOfDropping = false;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -331,25 +332,42 @@
   BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
 }
 
+BOOST_AUTO_TEST_CASE(BasicInOrder)
+{
+  DummyValidator acceptValidator;
+  SegmentFetcher::Options options;
+  options.inOrder = true;
+  nSegments = 401;
+  sendNackInsteadOfDropping = false;
+
+  auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+  connectSignals(fetcher);
+
+  face.processEvents(1_s);
+
+  BOOST_CHECK_EQUAL(nErrors, 0);
+  BOOST_CHECK_EQUAL(nCompletions, 1);
+  BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+  BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
 BOOST_AUTO_TEST_CASE(FirstSegmentNotZero)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   sendNackInsteadOfDropping = false;
   defaultSegmentToSend = 47;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -362,16 +380,38 @@
   BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
 }
 
+BOOST_AUTO_TEST_CASE(FirstSegmentNotZeroInOrder)
+{
+  DummyValidator acceptValidator;
+  SegmentFetcher::Options options;
+  options.inOrder = true;
+  nSegments = 401;
+  sendNackInsteadOfDropping = false;
+  defaultSegmentToSend = 47;
+
+  auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+  connectSignals(fetcher);
+
+  face.processEvents(1_s);
+
+  BOOST_CHECK_EQUAL(nErrors, 0);
+  BOOST_CHECK_EQUAL(nCompletions, 1);
+  BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+  BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
 BOOST_AUTO_TEST_CASE(WindowSize)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
 
   advanceClocks(10_ms); // T+10ms
 
@@ -540,17 +580,9 @@
 BOOST_AUTO_TEST_CASE(MoreSegmentsThanNSegments)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -595,24 +627,16 @@
 BOOST_AUTO_TEST_CASE(DuplicateNack)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   segmentsToDropOrNack.push(0);
   segmentsToDropOrNack.push(200);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::DUPLICATE;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -628,24 +652,16 @@
 BOOST_AUTO_TEST_CASE(CongestionNack)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   segmentsToDropOrNack.push(0);
   segmentsToDropOrNack.push(200);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::CONGESTION;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -661,10 +677,6 @@
 BOOST_AUTO_TEST_CASE(OtherNackReason)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   segmentsToDropOrNack.push(0);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::NO_ROUTE;
@@ -673,10 +685,6 @@
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -693,8 +701,8 @@
 {
   DummyValidator validator;
   validator.getPolicy().setResultCallback([] (const Name& name) {
-      return name.at(-1).toSegment() % 2 == 0;
-    });
+    return name.at(-1).toSegment() % 2 == 0;
+  });
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              validator);
   connectSignals(fetcher);
@@ -702,16 +710,6 @@
   auto data1 = makeDataSegment("/hello/world", 0, false);
   auto data2 = makeDataSegment("/hello/world", 1, true);
 
-  size_t nRecvSegments = 0;
-  fetcher->afterSegmentReceived.connect([&nRecvSegments] (const Data& receivedSegment) {
-      ++nRecvSegments;
-    });
-
-  size_t nValidatedSegments = 0;
-  fetcher->afterSegmentValidated.connect([&nValidatedSegments] (const Data& validatedSegment) {
-      ++nValidatedSegments;
-    });
-
   advanceClocks(10_ms, 10);
 
   BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 100_ms);
@@ -727,8 +725,8 @@
   advanceClocks(10_ms, 10);
 
   BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 200_ms);
-  BOOST_CHECK_EQUAL(nRecvSegments, 2);
-  BOOST_CHECK_EQUAL(nValidatedSegments, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 2);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 1);
   BOOST_CHECK_EQUAL(nErrors, 1);
 }
 
@@ -770,7 +768,7 @@
   bool fetcherStopped = false;
 
   fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
-  fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data& data) {
+  fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data&) {
                                           fetcherStopped = true;
                                           fetcher->stop();
                                         });
@@ -790,21 +788,12 @@
   // BasicSingleSegment, but with scoped fetcher
 
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
 
   weak_ptr<SegmentFetcher> weakFetcher;
   {
     auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
     weakFetcher = fetcher;
     connectSignals(fetcher);
-
-    fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-    fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-    fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-    fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
   }
 
   advanceClocks(10_ms);
@@ -829,21 +818,12 @@
   SegmentFetcher::Options options;
   options.maxTimeout = 3000_ms;
 
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
-
   weak_ptr<SegmentFetcher> weakFetcher;
   {
     auto fetcher = SegmentFetcher::start(face, Interest("/localhost/nfd/faces/list"),
                                          acceptValidator, options);
     weakFetcher = fetcher;
     connectSignals(fetcher);
-    fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-    fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-    fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-    fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
   }
 
   advanceClocks(500_ms, 7);