util: add "in order" mode to SegmentFetcher

This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.

Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/ndn-cxx/util/segment-fetcher.hpp b/ndn-cxx/util/segment-fetcher.hpp
index bf349d0..30879d6 100644
--- a/ndn-cxx/util/segment-fetcher.hpp
+++ b/ndn-cxx/util/segment-fetcher.hpp
@@ -19,10 +19,6 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
  */
 
 #ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -35,6 +31,7 @@
 #include "ndn-cxx/util/signal.hpp"
 
 #include <queue>
+#include <set>
 
 namespace ndn {
 namespace util {
@@ -53,20 +50,21 @@
  *
  * 1. Express an Interest to discover the latest version of the object:
  *
- *    Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
+ *    Interest: `/<prefix>?CanBePrefix&MustBeFresh`
  *
- * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
+ * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`.
  *
  * 3. Keep sending Interests for future segments until an error occurs or the number of segments
  *    indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
  *    at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
  *    otherwise, retrieval will start at segment 0. By default, congestion control will be used to
  *    manage the Interest window size. Interests expressed in this step will follow this Name
- *    format:
+ *    format: `/<prefix>/<version>/<segment=(N)>`.
  *
- *    Interest: `/<prefix>/<version>/<segment=(N)>`
- *
- * 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
+ * 4. If set to 'block' mode, signal #onComplete passing a memory buffer that combines the content
+ *    of all segments in the object. If set to 'in order' mode, signal #onInOrderData is triggered
+ *    upon validation of each segment in segment order, storing later segments that arrived out of
+ *    order internally until all earlier segments have arrived and have been validated.
  *
  * If an error occurs during the fetching process, #onError is signaled with one of the error codes
  * from SegmentFetcher::ErrorCode.
@@ -124,18 +122,20 @@
     validate();
 
   public:
-    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
-    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
-    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
     time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+    bool inOrder = false; ///< true for 'in order' mode, false for 'block' mode
+    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+    bool disableCwa = false; ///< disable Conservative Window Adaptation
+    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     double initCwnd = 1.0; ///< initial congestion window size
     double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
     double aiStep = 1.0; ///< additive increase step (in segments)
     double mdCoef = 0.5; ///< multiplicative decrease coefficient
-    bool disableCwa = false; ///< disable Conservative Window Adaptation
-    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
-    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
     RttEstimator::Options rttOptions; ///< options for RTT estimator
+    size_t flowControlWindow = 25000; ///< maximum number of segments stored in the reorder buffer
   };
 
   /**
@@ -244,37 +244,50 @@
 
 public:
   /**
-   * @brief Emits upon successful retrieval of the complete data.
+   * @brief Emitted upon successful retrieval of the complete object (all segments).
+   * @note Emitted only if SegmentFetcher is operating in 'block' mode.
    */
   Signal<SegmentFetcher, ConstBufferPtr> onComplete;
 
   /**
-   * @brief Emits when the retrieval could not be completed due to an error.
+   * @brief Emitted when the retrieval could not be completed due to an error.
    *
    * Handlers are provided with an error code and a string error message.
    */
   Signal<SegmentFetcher, uint32_t, std::string> onError;
 
   /**
-   * @brief Emits whenever a data segment received.
+   * @brief Emitted whenever a data segment received.
    */
   Signal<SegmentFetcher, Data> afterSegmentReceived;
 
   /**
-   * @brief Emits whenever a received data segment has been successfully validated.
+   * @brief Emitted whenever a received data segment has been successfully validated.
    */
   Signal<SegmentFetcher, Data> afterSegmentValidated;
 
   /**
-   * @brief Emits whenever an Interest for a data segment is nacked.
+   * @brief Emitted whenever an Interest for a data segment is nacked.
    */
   Signal<SegmentFetcher> afterSegmentNacked;
 
   /**
-   * @brief Emits whenever an Interest for a data segment times out.
+   * @brief Emitted whenever an Interest for a data segment times out.
    */
   Signal<SegmentFetcher> afterSegmentTimedOut;
 
+  /**
+   * @brief Emitted after each data segment in segment order has been validated.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher, ConstBufferPtr> onInOrderData;
+
+  /**
+   * @brief Emitted on successful retrieval of all segments in 'in order' mode.
+   * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+   */
+  Signal<SegmentFetcher> onInOrderComplete;
+
 private:
   enum class SegmentState {
     FirstInterest, ///< the first Interest for this segment has been sent
@@ -305,19 +318,21 @@
   time::steady_clock::TimePoint m_timeLastSegmentReceived;
   std::queue<uint64_t> m_retxQueue;
   Name m_versionedDataName;
-  uint64_t m_nextSegmentNum;
+  uint64_t m_nextSegmentNum = 0;
   double m_cwnd;
   double m_ssthresh;
-  int64_t m_nSegmentsInFlight;
-  int64_t m_nSegments;
-  uint64_t m_highInterest;
-  uint64_t m_highData;
-  uint64_t m_recPoint;
-  int64_t m_nReceived;
-  int64_t m_nBytesReceived;
+  int64_t m_nSegmentsInFlight = 0;
+  int64_t m_nSegments = 0;
+  uint64_t m_highInterest = 0;
+  uint64_t m_highData = 0;
+  uint64_t m_recPoint = 0;
+  int64_t m_nReceived = 0;
+  int64_t m_nBytesReceived = 0;
+  uint64_t m_nextSegmentInOrder = 0;
 
-  std::map<uint64_t, Buffer> m_receivedSegments;
+  std::map<uint64_t, Buffer> m_segmentBuffer;
   std::map<uint64_t, PendingSegment> m_pendingSegments;
+  std::set<uint64_t> m_receivedSegments;
 };
 
 } // namespace util