util: add "in order" mode to SegmentFetcher

This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.

Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/ndn-cxx/util/segment-fetcher.cpp b/ndn-cxx/util/segment-fetcher.cpp
index 6e5d492..0da0860 100644
--- a/ndn-cxx/util/segment-fetcher.cpp
+++ b/ndn-cxx/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2019 Regents of the University of California,
+ * Copyright (c) 2013-2020 Regents of the University of California,
  *                         Colorado State University,
  *                         University Pierre & Marie Curie, Sorbonne University.
  *
@@ -19,10 +19,6 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
  */
 
 #include "ndn-cxx/util/segment-fetcher.hpp"
@@ -71,16 +67,8 @@
   , m_validator(validator)
   , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
   , m_timeLastSegmentReceived(time::steady_clock::now())
-  , m_nextSegmentNum(0)
   , m_cwnd(options.initCwnd)
   , m_ssthresh(options.initSsthresh)
-  , m_nSegmentsInFlight(0)
-  , m_nSegments(0)
-  , m_highInterest(0)
-  , m_highData(0)
-  , m_recPoint(0)
-  , m_nReceived(0)
-  , m_nBytesReceived(0)
 {
   m_options.validate();
 }
@@ -137,7 +125,15 @@
     return finalizeFetch();
   }
 
-  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+  int64_t availableWindowSize;
+  if (m_options.inOrder) {
+    availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
+  }
+  else {
+    availableWindowSize = static_cast<int64_t>(m_cwnd);
+  }
+  availableWindowSize -= m_nSegmentsInFlight;
+
   std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
 
   while (availableWindowSize > 0) {
@@ -152,7 +148,7 @@
       segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
     }
     else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
-      if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+      if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
         // Don't request a segment a second time if received in response to first "discovery" Interest
         m_nextSegmentNum++;
         continue;
@@ -264,6 +260,8 @@
 
   // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
   uint64_t currentSegment = data.getName().get(-1).toSegment();
+  m_receivedSegments.insert(currentSegment);
+
   // Add measurement to RTO estimator (if not retransmission)
   if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
     BOOST_ASSERT(m_nSegmentsInFlight >= 0);
@@ -275,9 +273,9 @@
   m_pendingSegments.erase(pendingSegmentIt);
 
   // Copy data in segment to temporary buffer
-  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
-                                                      std::forward_as_tuple(currentSegment),
-                                                      std::forward_as_tuple(data.getContent().value_size()));
+  auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
+                                                   std::forward_as_tuple(currentSegment),
+                                                   std::forward_as_tuple(data.getContent().value_size()));
   std::copy(data.getContent().value_begin(), data.getContent().value_end(),
             receivedSegmentIt.first->second.begin());
   m_nBytesReceived += data.getContent().value_size();
@@ -295,6 +293,13 @@
     }
   }
 
+  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
+    do {
+      onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
+      m_segmentBuffer.erase(m_nextSegmentInOrder++);
+    } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
+  }
+
   if (m_receivedSegments.size() == 1) {
     m_versionedDataName = data.getName().getPrefix(-1);
     if (currentSegment == 0) {
@@ -405,16 +410,20 @@
 void
 SegmentFetcher::finalizeFetch()
 {
-  // Combine segments into final buffer
-  OBufferStream buf;
-  // We may have received more segments than exist in the object.
-  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
-
-  for (int64_t i = 0; i < m_nSegments; i++) {
-    buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+  if (m_options.inOrder) {
+    onInOrderComplete();
   }
+  else {
+    // Combine segments into final buffer
+    OBufferStream buf;
+    // We may have received more segments than exist in the object.
+    BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
 
-  onComplete(buf.buf());
+    for (int64_t i = 0; i < m_nSegments; i++) {
+      buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
+    }
+    onComplete(buf.buf());
+  }
   stop();
 }
 
diff --git a/ndn-cxx/util/segment-fetcher.hpp b/ndn-cxx/util/segment-fetcher.hpp
index bf349d0..30879d6 100644
--- a/ndn-cxx/util/segment-fetcher.hpp
+++ b/ndn-cxx/util/segment-fetcher.hpp
@@ -19,10 +19,6 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
  */
 
 #ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -35,6 +31,7 @@
 #include "ndn-cxx/util/signal.hpp"
 
 #include <queue>
+#include <set>
 
 namespace ndn {
 namespace util {
@@ -53,20 +50,21 @@
  *
  * 1. Express an Interest to discover the latest version of the object:
  *
- *    Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
+ *    Interest: `/<prefix>?CanBePrefix&MustBeFresh`
  *
- * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
+ * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`.
  *
  * 3. Keep sending Interests for future segments until an error occurs or the number of segments
  *    indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
  *    at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
  *    otherwise, retrieval will start at segment 0. By default, congestion control will be used to
  *    manage the Interest window size. Interests expressed in this step will follow this Name
- *    format:
+ *    format: `/<prefix>/<version>/<segment=(N)>`.
  *
- *    Interest: `/<prefix>/<version>/<segment=(N)>`
- *
- * 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
+ * 4. If set to 'block' mode, signal #onComplete passing a memory buffer that combines the content
+ *    of all segments in the object. If set to 'in order' mode, signal #onInOrderData is triggered
+ *    upon validation of each segment in segment order, storing later segments that arrived out of
+ *    order internally until all earlier segments have arrived and have been validated.
  *
  * If an error occurs during the fetching process, #onError is signaled with one of the error codes
  * from SegmentFetcher::ErrorCode.
@@ -124,18 +122,20 @@
     validate();
 
   public:
-    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
-    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
-    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
     time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+    bool inOrder = false; ///< true for 'in order' mode, false for 'block' mode
+    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+    bool disableCwa = false; ///< disable Conservative Window Adaptation
+    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     double initCwnd = 1.0; ///< initial congestion window size
     double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
     double aiStep = 1.0; ///< additive increase step (in segments)
     double mdCoef = 0.5; ///< multiplicative decrease coefficient
-    bool disableCwa = false; ///< disable Conservative Window Adaptation
-    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
-    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     RttEstimator::Options rttOptions; ///< options for RTT estimator
+    size_t flowControlWindow = 25000; ///< maximum number of segments stored in the reorder buffer
   };
 
   /**
@@ -244,37 +244,50 @@
 
 public:
   /**
-   * @brief Emits upon successful retrieval of the complete data.
+   * @brief Emitted upon successful retrieval of the complete object (all segments).
+   * @note Emitted only if SegmentFetcher is operating in 'block' mode.
    */
   Signal<SegmentFetcher, ConstBufferPtr> onComplete;
 
   /**
-   * @brief Emits when the retrieval could not be completed due to an error.
+   * @brief Emitted when the retrieval could not be completed due to an error.
    *
    * Handlers are provided with an error code and a string error message.
    */
   Signal<SegmentFetcher, uint32_t, std::string> onError;
 
   /**
-   * @brief Emits whenever a data segment received.
+   * @brief Emitted whenever a data segment received.
    */
   Signal<SegmentFetcher, Data> afterSegmentReceived;
 
   /**
-   * @brief Emits whenever a received data segment has been successfully validated.
+   * @brief Emitted whenever a received data segment has been successfully validated.
    */
   Signal<SegmentFetcher, Data> afterSegmentValidated;
 
   /**
-   * @brief Emits whenever an Interest for a data segment is nacked.
+   * @brief Emitted whenever an Interest for a data segment is nacked.
    */
   Signal<SegmentFetcher> afterSegmentNacked;
 
   /**
-   * @brief Emits whenever an Interest for a data segment times out.
+   * @brief Emitted whenever an Interest for a data segment times out.
    */
   Signal<SegmentFetcher> afterSegmentTimedOut;
 
+  /**
+   * @brief Emitted after each data segment in segment order has been validated.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher, ConstBufferPtr> onInOrderData;
+
+  /**
+   * @brief Emitted on successful retrieval of all segments in 'in order' mode.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher> onInOrderComplete;
+
 private:
   enum class SegmentState {
     FirstInterest, ///< the first Interest for this segment has been sent
@@ -305,19 +318,21 @@
   time::steady_clock::TimePoint m_timeLastSegmentReceived;
   std::queue<uint64_t> m_retxQueue;
   Name m_versionedDataName;
-  uint64_t m_nextSegmentNum;
+  uint64_t m_nextSegmentNum = 0;
   double m_cwnd;
   double m_ssthresh;
-  int64_t m_nSegmentsInFlight;
-  int64_t m_nSegments;
-  uint64_t m_highInterest;
-  uint64_t m_highData;
-  uint64_t m_recPoint;
-  int64_t m_nReceived;
-  int64_t m_nBytesReceived;
+  int64_t m_nSegmentsInFlight = 0;
+  int64_t m_nSegments = 0;
+  uint64_t m_highInterest = 0;
+  uint64_t m_highData = 0;
+  uint64_t m_recPoint = 0;
+  int64_t m_nReceived = 0;
+  int64_t m_nBytesReceived = 0;
+  uint64_t m_nextSegmentInOrder = 0;
 
-  std::map<uint64_t, Buffer> m_receivedSegments;
+  std::map<uint64_t, Buffer> m_segmentBuffer;
   std::map<uint64_t, PendingSegment> m_pendingSegments;
+  std::set<uint64_t> m_receivedSegments;
 };
 
 } // namespace util