util: augment SegmentFetcher with congestion control
refs #4364
Change-Id: Ibfebc1f173382d1faf88e0d3d707b8ab2dae5e43
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index 3b81cb4..e3df0c5 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -1,6 +1,8 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2018 Regents of the University of California.
+ * Copyright (c) 2013-2018, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -17,6 +19,10 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
*/
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -24,16 +30,18 @@
#include "../common.hpp"
#include "../face.hpp"
-#include "../encoding/buffer-stream.hpp"
#include "../security/v2/validator.hpp"
+#include "rtt-estimator.hpp"
#include "scheduler.hpp"
#include "signal.hpp"
+#include <queue>
+
namespace ndn {
namespace util {
/**
- * @brief Utility class to fetch latest version of the segmented data
+ * @brief Utility class to fetch the latest version of segmented data
*
* SegmentFetcher assumes that the data is named `/<prefix>/<version>/<segment>`,
* where:
@@ -61,25 +69,20 @@
*
* >> Interest: `/<prefix>/<version>/<segment=(N+1))>`
*
- * 6. Fire onCompletion callback with memory block that combines content part from all
- * segmented objects.
+ * 6. Signal `onComplete` with a memory block that combines the content of all segments of the
+ * object.
*
- * If an error occurs during the fetching process, an error callback is fired
- * with a proper error code. The following errors are possible:
+ * If an error occurs during the fetching process, `onError` is signaled with one of the error codes
+ * from `SegmentFetcher::ErrorCode`.
*
- * - `INTEREST_TIMEOUT`: if any of the Interests times out
- * - `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have segment
- * as a last component of the name (not counting implicit digest)
- * - `SEGMENT_VALIDATION_FAIL`: if any retrieved segment fails user-provided validation
- *
- * In order to validate individual segments, a Validator instance needs to be specified.
- * If the segment validation is successful, afterValidationSuccess callback is fired, otherwise
- * afterValidationFailure callback.
+ * A Validator instance must be specified to validate individual segments. Every time a segment has
+ * been successfully validated, `afterValidationSuccess` will be signaled. If a segment fails
+ * validation, `afterValidationFailure` will be signaled.
*
* Examples:
*
* void
- * afterFetchComplete(const ConstBufferPtr& data)
+ * afterFetchComplete(ConstBufferPtr data)
* {
* ...
* }
@@ -91,80 +94,113 @@
* }
*
* ...
- * SegmentFetcher::fetch(face, Interest("/data/prefix", 30_s),
- * validator,
- * bind(&afterFetchComplete, this, _1),
- * bind(&afterFetchError, this, _1, _2));
+ * auto fetcher = SegmentFetcher::start(face, Interest("/data/prefix"), validator);
+ * fetcher->onComplete.connect(bind(&afterFetchComplete, this, _1));
+ * fetcher->onError.connect(bind(&afterFetchError, this, _1, _2));
+ *
*
*/
class SegmentFetcher : noncopyable
{
public:
- /**
- * @brief Maximum number of times an interest will be reexpressed incase of NackCallback
- */
- static const uint32_t MAX_INTEREST_REEXPRESS;
-
+ // Deprecated: will be removed when deprecated fetch() API is removed - use start() instead
typedef function<void (ConstBufferPtr data)> CompleteCallback;
typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
/**
- * @brief Error codes that can be passed to ErrorCallback
+ * @brief Error codes passed to `onError`
*/
enum ErrorCode {
+ /// retrieval timed out because the maximum timeout between the successful receipt of segments was exceeded
INTEREST_TIMEOUT = 1,
+ /// one of the retrieved Data packets lacked a segment number in the last Name component (excl. implicit digest)
DATA_HAS_NO_SEGMENT = 2,
+ /// one of the retrieved segments failed user-provided validation
SEGMENT_VALIDATION_FAIL = 3,
- NACK_ERROR = 4
+ /// an unrecoverable Nack was received during retrieval
+ NACK_ERROR = 4,
+ /// a received FinalBlockId did not contain a segment component
+ FINALBLOCKID_NOT_SEGMENT = 5
+ };
+
+ class Options
+ {
+ public:
+ Options()
+ {
+ }
+
+ void
+ validate();
+
+ public:
+ bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+ bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+ time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+ time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+ double initCwnd = 1.0; ///< initial congestion window size
+ double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
+ double aiStep = 1.0; ///< additive increase step (in segments)
+ double mdCoef = 0.5; ///< multiplicative decrease coefficient
+ bool disableCwa = false; ///< disable Conservative Window Adaptation
+ bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+ bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
+ RttEstimator::Options rttOptions; ///< options for RTT estimator
};
/**
* @brief Initiates segment fetching
*
* @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
+ * @param baseInterest Interest for the initial segment of requested data.
* This interest may include a custom InterestLifetime and selectors that will
* propagate to all subsequent Interests. The only exception is that the
* initial Interest will be forced to include the "ChildSelector=1" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
+ * "MustBeFresh=true" selectors, which will not be included in subsequent
* Interests.
- * @param validator Reference to the Validator that should be used to validate data. Caller
- * must ensure validator is valid until either onComplete or onError has been
- * signaled.
+ * @param validator Reference to the Validator the fetcher will use to validate data.
+ * The caller must ensure the validator is valid until either `onComplete` or
+ * `onError` has been signaled.
+ * @param options Options controlling the transfer
*
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that the
+ * SegmentFetcher's signals can be connected to.
*
* Transfer completion, failure, and progress are indicated via signals.
*/
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
start(Face& face,
const Interest& baseInterest,
- security::v2::Validator& validator);
+ security::v2::Validator& validator,
+ const Options& options = Options());
/**
* @brief Initiates segment fetching
*
* @deprecated Use @c start
*
- * @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
- * This interest may include custom InterestLifetime and selectors that
- * will propagate to all subsequent Interests. The only exception is that
- * the initial Interest will be forced to include "ChildSelector=rightmost" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
- * Interests.
- * @param validator Reference to the Validator that should be used to validate data. Caller
- * must ensure validator is valid until either completeCallback or errorCallback
- * is invoked.
+ * @param face Reference to the Face that should be used to fetch data
+ * @param baseInterest An Interest for the initial segment of requested data.
+ * This interest may include custom InterestLifetime and selectors that
+ * will propagate to all subsequent Interests. The only exception is that
+ * the initial Interest will be forced to include
+ * "ChildSelector=rightmost" and "MustBeFresh=true" selectors, which will
+ * be turned off in subsequent Interests.
+ * @param validator Reference to the Validator that should be used to validate data. Caller
+ * must ensure validator is valid until either completeCallback or
+ * errorCallback is invoked.
+ * @param completeCallback Callback to be fired when all segments are fetched
+ * @param errorCallback Callback to be fired when an error occurs (@see Errors)
*
- * @param completeCallback Callback to be fired when all segments are fetched
- * @param errorCallback Callback to be fired when an error occurs (@see Errors)
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that
+ * the SegmentFetcher's signals can be connected to.
*/
[[deprecated("use SegmentFetcher::start instead")]]
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
fetch(Face& face,
const Interest& baseInterest,
security::v2::Validator& validator,
@@ -176,22 +212,25 @@
*
* @deprecated Use @c start
*
- * @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
- * This interest may include custom InterestLifetime and selectors that
- * will propagate to all subsequent Interests. The only exception is that
- * the initial Interest will be forced to include "ChildSelector=1" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
- * Interests.
- * @param validator A shared_ptr to the Validator that should be used to validate data.
+ * @param face Reference to the Face that should be used to fetch data
+ * @param baseInterest An Interest for the initial segment of requested data.
+ * This interest may include custom InterestLifetime and selectors that
+ * will propagate to all subsequent Interests. The only exception is that
+ * the initial Interest will be forced to include "ChildSelector=1" and
+ * "MustBeFresh=true" selectors, which will be turned off in subsequent
+ * Interests.
+ * @param validator A shared_ptr to the Validator that should be used to validate data.
*
- * @param completeCallback Callback to be fired when all segments are fetched
- * @param errorCallback Callback to be fired when an error occurs (@see Errors)
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @param completeCallback Callback to be fired when all segments are fetched
+ * @param errorCallback Callback to be fired when an error occurs (@see Errors)
+ *
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that
+ * the SegmentFetcher's signals can be connected to.
*/
[[deprecated("use SegmentFetcher::start instead")]]
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
fetch(Face& face,
const Interest& baseInterest,
shared_ptr<security::v2::Validator> validator,
@@ -199,35 +238,71 @@
const ErrorCallback& errorCallback);
private:
- SegmentFetcher(Face& face, security::v2::Validator& validator);
+ class PendingSegment;
+
+ SegmentFetcher(Face& face, security::v2::Validator& validator, const Options& options);
void
- fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
+ fetchFirstSegment(const Interest& baseInterest,
+ bool isRetransmission,
+ shared_ptr<SegmentFetcher> self);
void
- fetchNextSegment(const Interest& origInterest, const Name& dataName, uint64_t segmentNo,
- shared_ptr<SegmentFetcher> self);
+ fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self);
void
afterSegmentReceivedCb(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
+ const Data& data,
shared_ptr<SegmentFetcher> self);
void
afterValidationSuccess(const Data& data,
- bool isSegmentZeroExpected,
const Interest& origInterest,
+ std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
shared_ptr<SegmentFetcher> self);
void
- afterValidationFailure(const Data& data, const security::v2::ValidationError& error);
+ afterValidationFailure(const Data& data,
+ const security::v2::ValidationError& error,
+ shared_ptr<SegmentFetcher> self);
void
- afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
- uint32_t reExpressCount, shared_ptr<SegmentFetcher> self);
+ afterNackReceivedCb(const Interest& origInterest,
+ const lp::Nack& nack,
+ shared_ptr<SegmentFetcher> self);
void
- reExpressInterest(Interest interest, uint32_t reExpressCount,
- shared_ptr<SegmentFetcher> self);
+ afterTimeoutCb(const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self);
+
+ void
+ afterNackOrTimeout(const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self);
+
+ void
+ finalizeFetch(shared_ptr<SegmentFetcher> self);
+
+ void
+ windowIncrease();
+
+ void
+ windowDecrease();
+
+ void
+ signalError(uint32_t code, const std::string& msg);
+
+ void
+ updateRetransmittedSegment(uint64_t segmentNum,
+ const PendingInterestId* pendingInterest,
+ scheduler::EventId timeoutEvent);
+
+ void
+ cancelExcessInFlightSegments();
+
+ bool
+ checkAllSegmentsReceived();
+
+ time::milliseconds
+ getEstimatedRto();
public:
/**
@@ -252,12 +327,58 @@
*/
Signal<SegmentFetcher, Data> afterSegmentValidated;
+ /**
+ * @brief Emits whenever an Interest for a data segment is nacked
+ */
+ Signal<SegmentFetcher> afterSegmentNacked;
+
+ /**
+ * @brief Emits whenever an Interest for a data segment times out
+ */
+ Signal<SegmentFetcher> afterSegmentTimedOut;
+
private:
+ enum class SegmentState {
+ FirstInterest, ///< the first Interest for this segment has been sent
+ InRetxQueue, ///< the segment is awaiting Interest retransmission
+ Retransmitted, ///< one or more retransmitted Interests have been sent for this segment
+ };
+
+ class PendingSegment
+ {
+ public:
+ SegmentState state;
+ time::steady_clock::TimePoint sendTime;
+ const PendingInterestId* id;
+ scheduler::EventId timeoutEvent;
+ };
+
+NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ static constexpr double MIN_SSTHRESH = 2.0;
+
+ Options m_options;
Face& m_face;
Scheduler m_scheduler;
security::v2::Validator& m_validator;
+ RttEstimator m_rttEstimator;
+ time::milliseconds m_timeout;
- OBufferStream m_buffer;
+ time::steady_clock::TimePoint m_timeLastSegmentReceived;
+ std::queue<uint64_t> m_retxQueue;
+ Name m_versionedDataName;
+ uint64_t m_nextSegmentNum;
+ double m_cwnd;
+ double m_ssthresh;
+ int64_t m_nSegmentsInFlight;
+ int64_t m_nSegments;
+ uint64_t m_highInterest;
+ uint64_t m_highData;
+ uint64_t m_recPoint;
+ int64_t m_nReceived;
+ int64_t m_nBytesReceived;
+
+ std::map<uint64_t, Buffer> m_receivedSegments;
+ std::map<uint64_t, PendingSegment> m_pendingSegments;
};
} // namespace util