util: augment SegmentFetcher with congestion control

refs #4364

Change-Id: Ibfebc1f173382d1faf88e0d3d707b8ab2dae5e43
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index 3b81cb4..e3df0c5 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -1,6 +1,8 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2018 Regents of the University of California.
+ * Copyright (c) 2013-2018, Regents of the University of California,
+ *                          Colorado State University,
+ *                          University Pierre & Marie Curie, Sorbonne University.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -17,6 +19,10 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
  */
 
 #ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -24,16 +30,18 @@
 
 #include "../common.hpp"
 #include "../face.hpp"
-#include "../encoding/buffer-stream.hpp"
 #include "../security/v2/validator.hpp"
+#include "rtt-estimator.hpp"
 #include "scheduler.hpp"
 #include "signal.hpp"
 
+#include <queue>
+
 namespace ndn {
 namespace util {
 
 /**
- * @brief Utility class to fetch latest version of the segmented data
+ * @brief Utility class to fetch the latest version of segmented data
  *
  * SegmentFetcher assumes that the data is named `/<prefix>/<version>/<segment>`,
  * where:
@@ -61,25 +69,20 @@
  *
  *    >> Interest: `/<prefix>/<version>/<segment=(N+1))>`
  *
- * 6. Fire onCompletion callback with memory block that combines content part from all
- *    segmented objects.
+ * 6. Signal `onComplete` with a memory block that combines the content of all segments of the
+ *    object.
  *
- * If an error occurs during the fetching process, an error callback is fired
- * with a proper error code.  The following errors are possible:
+ * If an error occurs during the fetching process, `onError` is signaled with one of the error codes
+ * from `SegmentFetcher::ErrorCode`.
  *
- * - `INTEREST_TIMEOUT`: if any of the Interests times out
- * - `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have segment
- *   as a last component of the name (not counting implicit digest)
- * - `SEGMENT_VALIDATION_FAIL`: if any retrieved segment fails user-provided validation
- *
- * In order to validate individual segments, a Validator instance needs to be specified.
- * If the segment validation is successful, afterValidationSuccess callback is fired, otherwise
- * afterValidationFailure callback.
+ * A Validator instance must be specified to validate individual segments. Every time a segment has
+ * been successfully validated, `afterValidationSuccess` will be signaled. If a segment fails
+ * validation, `afterValidationFailure` will be signaled.
  *
  * Examples:
  *
  *     void
- *     afterFetchComplete(const ConstBufferPtr& data)
+ *     afterFetchComplete(ConstBufferPtr data)
  *     {
  *       ...
  *     }
@@ -91,80 +94,113 @@
  *     }
  *
  *     ...
- *     SegmentFetcher::fetch(face, Interest("/data/prefix", 30_s),
- *                           validator,
- *                           bind(&afterFetchComplete, this, _1),
- *                           bind(&afterFetchError, this, _1, _2));
+ *     auto fetcher = SegmentFetcher::start(face, Interest("/data/prefix"), validator);
+ *     fetcher->onComplete.connect(bind(&afterFetchComplete, this, _1));
+ *     fetcher->onError.connect(bind(&afterFetchError, this, _1, _2));
+ *
  *
  */
 class SegmentFetcher : noncopyable
 {
 public:
-  /**
-   * @brief Maximum number of times an interest will be reexpressed incase of NackCallback
-   */
-  static const uint32_t MAX_INTEREST_REEXPRESS;
-
+  // Deprecated: will be removed when deprecated fetch() API is removed - use start() instead
   typedef function<void (ConstBufferPtr data)> CompleteCallback;
   typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
 
   /**
-   * @brief Error codes that can be passed to ErrorCallback
+   * @brief Error codes passed to `onError`
    */
   enum ErrorCode {
+    /// retrieval timed out because the maximum timeout between the successful receipt of segments was exceeded
     INTEREST_TIMEOUT = 1,
+    /// one of the retrieved Data packets lacked a segment number in the last Name component (excl. implicit digest)
     DATA_HAS_NO_SEGMENT = 2,
+    /// one of the retrieved segments failed user-provided validation
     SEGMENT_VALIDATION_FAIL = 3,
-    NACK_ERROR = 4
+    /// an unrecoverable Nack was received during retrieval
+    NACK_ERROR = 4,
+    /// a received FinalBlockId did not contain a segment component
+    FINALBLOCKID_NOT_SEGMENT = 5
+  };
+
+  class Options
+  {
+  public:
+    Options()
+    {
+    }
+
+    void
+    validate();
+
+  public:
+    bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+    bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+    time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+    time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+    double initCwnd = 1.0; ///< initial congestion window size
+    double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
+    double aiStep = 1.0; ///< additive increase step (in segments)
+    double mdCoef = 0.5; ///< multiplicative decrease coefficient
+    bool disableCwa = false; ///< disable Conservative Window Adaptation
+    bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+    bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
+    RttEstimator::Options rttOptions; ///< options for RTT estimator
   };
 
   /**
    * @brief Initiates segment fetching
    *
    * @param face         Reference to the Face that should be used to fetch data
-   * @param baseInterest An Interest for the initial segment of requested data.
+   * @param baseInterest Interest for the initial segment of requested data.
    *                     This interest may include a custom InterestLifetime and selectors that will
    *                     propagate to all subsequent Interests. The only exception is that the
    *                     initial Interest will be forced to include the "ChildSelector=1" and
-   *                     "MustBeFresh=true" selectors, which will be turned off in subsequent
+   *                     "MustBeFresh=true" selectors, which will not be included in subsequent
    *                     Interests.
-   * @param validator    Reference to the Validator that should be used to validate data. Caller
-   *                     must ensure validator is valid until either onComplete or onError has been
-   *                     signaled.
+   * @param validator    Reference to the Validator the fetcher will use to validate data.
+   *                     The caller must ensure the validator is valid until either `onComplete` or
+   *                     `onError` has been signaled.
+   * @param options      Options controlling the transfer
    *
-   * @return A shared_ptr to the constructed SegmentFetcher
+   * @return             A shared_ptr to the constructed SegmentFetcher.
+   *                     This shared_ptr is kept internally for the lifetime of the transfer.
+   *                     Therefore, it does not need to be saved and is provided here so that the
+   *                     SegmentFetcher's signals can be connected to.
    *
    * Transfer completion, failure, and progress are indicated via signals.
    */
-  static
-  shared_ptr<SegmentFetcher>
+  static shared_ptr<SegmentFetcher>
   start(Face& face,
         const Interest& baseInterest,
-        security::v2::Validator& validator);
+        security::v2::Validator& validator,
+        const Options& options = Options());
 
   /**
    * @brief Initiates segment fetching
    *
    * @deprecated Use @c start
    *
-   * @param face          Reference to the Face that should be used to fetch data
-   * @param baseInterest  An Interest for the initial segment of requested data.
-   *                      This interest may include custom InterestLifetime and selectors that
-   *                      will propagate to all subsequent Interests.  The only exception is that
-   *                      the initial Interest will be forced to include "ChildSelector=rightmost" and
-   *                      "MustBeFresh=true" selectors, which will be turned off in subsequent
-   *                      Interests.
-   * @param validator     Reference to the Validator that should be used to validate data. Caller
-   *                      must ensure validator is valid until either completeCallback or errorCallback
-   *                      is invoked.
+   * @param face             Reference to the Face that should be used to fetch data
+   * @param baseInterest     An Interest for the initial segment of requested data.
+   *                         This interest may include custom InterestLifetime and selectors that
+   *                         will propagate to all subsequent Interests.  The only exception is that
+   *                         the initial Interest will be forced to include
+   *                         "ChildSelector=rightmost" and "MustBeFresh=true" selectors, which will
+   *                         be turned off in subsequent Interests.
+   * @param validator        Reference to the Validator that should be used to validate data. Caller
+   *                         must ensure validator is valid until either completeCallback or
+   *                         errorCallback is invoked.
+   * @param completeCallback Callback to be fired when all segments are fetched
+   * @param errorCallback    Callback to be fired when an error occurs (@see Errors)
    *
-   * @param completeCallback    Callback to be fired when all segments are fetched
-   * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
-   * @return A shared_ptr to the constructed SegmentFetcher
+   * @return                 A shared_ptr to the constructed SegmentFetcher.
+   *                         This shared_ptr is kept internally for the lifetime of the transfer.
+   *                         Therefore, it does not need to be saved and is provided here so that
+   *                         the SegmentFetcher's signals can be connected to.
    */
   [[deprecated("use SegmentFetcher::start instead")]]
-  static
-  shared_ptr<SegmentFetcher>
+  static shared_ptr<SegmentFetcher>
   fetch(Face& face,
         const Interest& baseInterest,
         security::v2::Validator& validator,
@@ -176,22 +212,25 @@
    *
    * @deprecated Use @c start
    *
-   * @param face          Reference to the Face that should be used to fetch data
-   * @param baseInterest  An Interest for the initial segment of requested data.
-   *                      This interest may include custom InterestLifetime and selectors that
-   *                      will propagate to all subsequent Interests.  The only exception is that
-   *                      the initial Interest will be forced to include "ChildSelector=1" and
-   *                      "MustBeFresh=true" selectors, which will be turned off in subsequent
-   *                      Interests.
-   * @param validator     A shared_ptr to the Validator that should be used to validate data.
+   * @param face             Reference to the Face that should be used to fetch data
+   * @param baseInterest     An Interest for the initial segment of requested data.
+   *                         This interest may include custom InterestLifetime and selectors that
+   *                         will propagate to all subsequent Interests.  The only exception is that
+   *                         the initial Interest will be forced to include "ChildSelector=1" and
+   *                         "MustBeFresh=true" selectors, which will be turned off in subsequent
+   *                         Interests.
+   * @param validator        A shared_ptr to the Validator that should be used to validate data.
    *
-   * @param completeCallback    Callback to be fired when all segments are fetched
-   * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
-   * @return A shared_ptr to the constructed SegmentFetcher
+   * @param completeCallback Callback to be fired when all segments are fetched
+   * @param errorCallback    Callback to be fired when an error occurs (@see Errors)
+   *
+   * @return                 A shared_ptr to the constructed SegmentFetcher.
+   *                         This shared_ptr is kept internally for the lifetime of the transfer.
+   *                         Therefore, it does not need to be saved and is provided here so that
+   *                         the SegmentFetcher's signals can be connected to.
    */
   [[deprecated("use SegmentFetcher::start instead")]]
-  static
-  shared_ptr<SegmentFetcher>
+  static shared_ptr<SegmentFetcher>
   fetch(Face& face,
         const Interest& baseInterest,
         shared_ptr<security::v2::Validator> validator,
@@ -199,35 +238,71 @@
         const ErrorCallback& errorCallback);
 
 private:
-  SegmentFetcher(Face& face, security::v2::Validator& validator);
+  class PendingSegment;
+
+  SegmentFetcher(Face& face, security::v2::Validator& validator, const Options& options);
 
   void
-  fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
+  fetchFirstSegment(const Interest& baseInterest,
+                    bool isRetransmission,
+                    shared_ptr<SegmentFetcher> self);
 
   void
-  fetchNextSegment(const Interest& origInterest, const Name& dataName, uint64_t segmentNo,
-                   shared_ptr<SegmentFetcher> self);
+  fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self);
 
   void
   afterSegmentReceivedCb(const Interest& origInterest,
-                         const Data& data, bool isSegmentZeroExpected,
+                         const Data& data,
                          shared_ptr<SegmentFetcher> self);
   void
   afterValidationSuccess(const Data& data,
-                         bool isSegmentZeroExpected,
                          const Interest& origInterest,
+                         std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
                          shared_ptr<SegmentFetcher> self);
 
   void
-  afterValidationFailure(const Data& data, const security::v2::ValidationError& error);
+  afterValidationFailure(const Data& data,
+                         const security::v2::ValidationError& error,
+                         shared_ptr<SegmentFetcher> self);
 
   void
-  afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
-                      uint32_t reExpressCount, shared_ptr<SegmentFetcher> self);
+  afterNackReceivedCb(const Interest& origInterest,
+                      const lp::Nack& nack,
+                      shared_ptr<SegmentFetcher> self);
 
   void
-  reExpressInterest(Interest interest, uint32_t reExpressCount,
-                    shared_ptr<SegmentFetcher> self);
+  afterTimeoutCb(const Interest& origInterest,
+                 shared_ptr<SegmentFetcher> self);
+
+  void
+  afterNackOrTimeout(const Interest& origInterest,
+                     shared_ptr<SegmentFetcher> self);
+
+  void
+  finalizeFetch(shared_ptr<SegmentFetcher> self);
+
+  void
+  windowIncrease();
+
+  void
+  windowDecrease();
+
+  void
+  signalError(uint32_t code, const std::string& msg);
+
+  void
+  updateRetransmittedSegment(uint64_t segmentNum,
+                             const PendingInterestId* pendingInterest,
+                             scheduler::EventId timeoutEvent);
+
+  void
+  cancelExcessInFlightSegments();
+
+  bool
+  checkAllSegmentsReceived();
+
+  time::milliseconds
+  getEstimatedRto();
 
 public:
   /**
@@ -252,12 +327,58 @@
    */
   Signal<SegmentFetcher, Data> afterSegmentValidated;
 
+  /**
+   * @brief Emits whenever an Interest for a data segment is nacked
+   */
+  Signal<SegmentFetcher> afterSegmentNacked;
+
+  /**
+   * @brief Emits whenever an Interest for a data segment times out
+   */
+  Signal<SegmentFetcher> afterSegmentTimedOut;
+
 private:
+  enum class SegmentState {
+    FirstInterest, ///< the first Interest for this segment has been sent
+    InRetxQueue,   ///< the segment is awaiting Interest retransmission
+    Retransmitted, ///< one or more retransmitted Interests have been sent for this segment
+  };
+
+  class PendingSegment
+  {
+  public:
+    SegmentState state;
+    time::steady_clock::TimePoint sendTime;
+    const PendingInterestId* id;
+    scheduler::EventId timeoutEvent;
+  };
+
+NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+  static constexpr double MIN_SSTHRESH = 2.0;
+
+  Options m_options;
   Face& m_face;
   Scheduler m_scheduler;
   security::v2::Validator& m_validator;
+  RttEstimator m_rttEstimator;
+  time::milliseconds m_timeout;
 
-  OBufferStream m_buffer;
+  time::steady_clock::TimePoint m_timeLastSegmentReceived;
+  std::queue<uint64_t> m_retxQueue;
+  Name m_versionedDataName;
+  uint64_t m_nextSegmentNum;
+  double m_cwnd;
+  double m_ssthresh;
+  int64_t m_nSegmentsInFlight;
+  int64_t m_nSegments;
+  uint64_t m_highInterest;
+  uint64_t m_highData;
+  uint64_t m_recPoint;
+  int64_t m_nReceived;
+  int64_t m_nBytesReceived;
+
+  std::map<uint64_t, Buffer> m_receivedSegments;
+  std::map<uint64_t, PendingSegment> m_pendingSegments;
 };
 
 } // namespace util