util: add "in order" mode to SegmentFetcher

This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.

Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/tests/unit/util/segment-fetcher.t.cpp b/tests/unit/util/segment-fetcher.t.cpp
index 9f65f49..7f63db8 100644
--- a/tests/unit/util/segment-fetcher.t.cpp
+++ b/tests/unit/util/segment-fetcher.t.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2019 Regents of the University of California.
+ * Copyright (c) 2013-2020 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -73,7 +73,20 @@
   {
     ++nCompletions;
     dataSize = data->size();
-    dataBuf = data;
+  }
+
+  void
+  onInOrderComplete()
+  {
+    ++nCompletions;
+    ++nOnInOrderComplete;
+  }
+
+  void
+  onInOrderData(ConstBufferPtr data)
+  {
+    ++nOnInOrderData;
+    dataSize += data->size();
   }
 
   void
@@ -88,8 +101,15 @@
   void
   connectSignals(const shared_ptr<SegmentFetcher>& fetcher)
   {
+    fetcher->onInOrderData.connect(bind(&Fixture::onInOrderData, this, _1));
+    fetcher->onInOrderComplete.connect(bind(&Fixture::onInOrderComplete, this));
     fetcher->onComplete.connect(bind(&Fixture::onComplete, this, _1));
     fetcher->onError.connect(bind(&Fixture::onError, this, _1));
+
+    fetcher->afterSegmentReceived.connect([this] (const auto&) { ++this->nAfterSegmentReceived; });
+    fetcher->afterSegmentValidated.connect([this] (const auto &) { ++this->nAfterSegmentValidated; });
+    fetcher->afterSegmentNacked.connect([this] { ++nAfterSegmentNacked; });
+    fetcher->afterSegmentTimedOut.connect([this] { ++nAfterSegmentTimedOut; });
   }
 
   void
@@ -141,7 +161,12 @@
   uint32_t lastError = 0;
   int nCompletions = 0;
   size_t dataSize = 0;
-  ConstBufferPtr dataBuf;
+  size_t nAfterSegmentReceived = 0;
+  size_t nAfterSegmentValidated = 0;
+  size_t nAfterSegmentNacked = 0;
+  size_t nAfterSegmentTimedOut = 0;
+  size_t nOnInOrderData = 0;
+  size_t nOnInOrderComplete = 0;
 
   // number of segments in fetched object
   uint64_t nSegments = 0;
@@ -200,17 +225,9 @@
 BOOST_AUTO_TEST_CASE(BasicSingleSegment)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -231,17 +248,9 @@
   SegmentFetcher::Options options;
   options.useConstantCwnd = true;
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator, options);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -304,21 +313,13 @@
 BOOST_AUTO_TEST_CASE(BasicMultipleSegments)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   sendNackInsteadOfDropping = false;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -331,25 +332,42 @@
   BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
 }
 
+BOOST_AUTO_TEST_CASE(BasicInOrder)
+{
+  DummyValidator acceptValidator;
+  SegmentFetcher::Options options;
+  options.inOrder = true;
+  nSegments = 401;
+  sendNackInsteadOfDropping = false;
+
+  auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+  connectSignals(fetcher);
+
+  face.processEvents(1_s);
+
+  BOOST_CHECK_EQUAL(nErrors, 0);
+  BOOST_CHECK_EQUAL(nCompletions, 1);
+  BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+  BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
 BOOST_AUTO_TEST_CASE(FirstSegmentNotZero)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   sendNackInsteadOfDropping = false;
   defaultSegmentToSend = 47;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -362,16 +380,38 @@
   BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
 }
 
+BOOST_AUTO_TEST_CASE(FirstSegmentNotZeroInOrder)
+{
+  DummyValidator acceptValidator;
+  SegmentFetcher::Options options;
+  options.inOrder = true;
+  nSegments = 401;
+  sendNackInsteadOfDropping = false;
+  defaultSegmentToSend = 47;
+
+  auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+  connectSignals(fetcher);
+
+  face.processEvents(1_s);
+
+  BOOST_CHECK_EQUAL(nErrors, 0);
+  BOOST_CHECK_EQUAL(nCompletions, 1);
+  BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+  BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+  BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
 BOOST_AUTO_TEST_CASE(WindowSize)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
 
   advanceClocks(10_ms); // T+10ms
 
@@ -540,17 +580,9 @@
 BOOST_AUTO_TEST_CASE(MoreSegmentsThanNSegments)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   advanceClocks(10_ms);
 
@@ -595,24 +627,16 @@
 BOOST_AUTO_TEST_CASE(DuplicateNack)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   segmentsToDropOrNack.push(0);
   segmentsToDropOrNack.push(200);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::DUPLICATE;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -628,24 +652,16 @@
 BOOST_AUTO_TEST_CASE(CongestionNack)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   nSegments = 401;
   segmentsToDropOrNack.push(0);
   segmentsToDropOrNack.push(200);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::CONGESTION;
-  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
 
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
+  face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -661,10 +677,6 @@
 BOOST_AUTO_TEST_CASE(OtherNackReason)
 {
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
   segmentsToDropOrNack.push(0);
   sendNackInsteadOfDropping = true;
   nackReason = lp::NackReason::NO_ROUTE;
@@ -673,10 +685,6 @@
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              acceptValidator);
   connectSignals(fetcher);
-  fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-  fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-  fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-  fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
 
   face.processEvents(1_s);
 
@@ -693,8 +701,8 @@
 {
   DummyValidator validator;
   validator.getPolicy().setResultCallback([] (const Name& name) {
-      return name.at(-1).toSegment() % 2 == 0;
-    });
+    return name.at(-1).toSegment() % 2 == 0;
+  });
   shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
                                                              validator);
   connectSignals(fetcher);
@@ -702,16 +710,6 @@
   auto data1 = makeDataSegment("/hello/world", 0, false);
   auto data2 = makeDataSegment("/hello/world", 1, true);
 
-  size_t nRecvSegments = 0;
-  fetcher->afterSegmentReceived.connect([&nRecvSegments] (const Data& receivedSegment) {
-      ++nRecvSegments;
-    });
-
-  size_t nValidatedSegments = 0;
-  fetcher->afterSegmentValidated.connect([&nValidatedSegments] (const Data& validatedSegment) {
-      ++nValidatedSegments;
-    });
-
   advanceClocks(10_ms, 10);
 
   BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 100_ms);
@@ -727,8 +725,8 @@
   advanceClocks(10_ms, 10);
 
   BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 200_ms);
-  BOOST_CHECK_EQUAL(nRecvSegments, 2);
-  BOOST_CHECK_EQUAL(nValidatedSegments, 1);
+  BOOST_CHECK_EQUAL(nAfterSegmentReceived, 2);
+  BOOST_CHECK_EQUAL(nAfterSegmentValidated, 1);
   BOOST_CHECK_EQUAL(nErrors, 1);
 }
 
@@ -770,7 +768,7 @@
   bool fetcherStopped = false;
 
   fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
-  fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data& data) {
+  fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data&) {
                                           fetcherStopped = true;
                                           fetcher->stop();
                                         });
@@ -790,21 +788,12 @@
   // BasicSingleSegment, but with scoped fetcher
 
   DummyValidator acceptValidator;
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
 
   weak_ptr<SegmentFetcher> weakFetcher;
   {
     auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
     weakFetcher = fetcher;
     connectSignals(fetcher);
-
-    fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-    fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-    fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-    fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
   }
 
   advanceClocks(10_ms);
@@ -829,21 +818,12 @@
   SegmentFetcher::Options options;
   options.maxTimeout = 3000_ms;
 
-  size_t nAfterSegmentReceived = 0;
-  size_t nAfterSegmentValidated = 0;
-  size_t nAfterSegmentNacked = 0;
-  size_t nAfterSegmentTimedOut = 0;
-
   weak_ptr<SegmentFetcher> weakFetcher;
   {
     auto fetcher = SegmentFetcher::start(face, Interest("/localhost/nfd/faces/list"),
                                          acceptValidator, options);
     weakFetcher = fetcher;
     connectSignals(fetcher);
-    fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
-    fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
-    fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
-    fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
   }
 
   advanceClocks(500_ms, 7);