util: augment SegmentFetcher with congestion control
refs #4364
Change-Id: Ibfebc1f173382d1faf88e0d3d707b8ab2dae5e43
diff --git a/src/mgmt/nfd/controller.cpp b/src/mgmt/nfd/controller.cpp
index 945c16d..7ed8005 100644
--- a/src/mgmt/nfd/controller.cpp
+++ b/src/mgmt/nfd/controller.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2017 Regents of the University of California.
+ * Copyright (c) 2013-2018 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -133,15 +133,19 @@
void
Controller::fetchDataset(const Name& prefix,
- const std::function<void(const ConstBufferPtr&)>& processResponse,
+ const std::function<void(ConstBufferPtr)>& processResponse,
const DatasetFailCallback& onFailure,
const CommandOptions& options)
{
Interest baseInterest(prefix);
- baseInterest.setInterestLifetime(options.getTimeout());
- SegmentFetcher::fetch(m_face, baseInterest, m_validator, processResponse,
- bind(&Controller::processDatasetFetchError, this, onFailure, _1, _2));
+ SegmentFetcher::Options fetcherOptions;
+ fetcherOptions.maxTimeout = options.getTimeout();
+ auto fetcher = SegmentFetcher::start(m_face, baseInterest, m_validator, fetcherOptions);
+ fetcher->onComplete.connect(processResponse);
+ fetcher->onError.connect([=] (uint32_t code, const std::string& msg) {
+ processDatasetFetchError(onFailure, code, msg);
+ });
}
void
@@ -156,6 +160,7 @@
onFailure(ERROR_TIMEOUT, msg);
break;
case SegmentFetcher::ErrorCode::DATA_HAS_NO_SEGMENT:
+ case SegmentFetcher::ErrorCode::FINALBLOCKID_NOT_SEGMENT:
onFailure(ERROR_SERVER, msg);
break;
case SegmentFetcher::ErrorCode::SEGMENT_VALIDATION_FAIL:
diff --git a/src/mgmt/nfd/controller.hpp b/src/mgmt/nfd/controller.hpp
index e98b606..8797b39 100644
--- a/src/mgmt/nfd/controller.hpp
+++ b/src/mgmt/nfd/controller.hpp
@@ -132,7 +132,7 @@
void
fetchDataset(const Name& prefix,
- const std::function<void(const ConstBufferPtr&)>& processResponse,
+ const std::function<void(ConstBufferPtr)>& processResponse,
const DatasetFailCallback& onFailure,
const CommandOptions& options);
diff --git a/src/util/rtt-estimator.cpp b/src/util/rtt-estimator.cpp
new file mode 100644
index 0000000..f79b6e4
--- /dev/null
+++ b/src/util/rtt-estimator.cpp
@@ -0,0 +1,80 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (C) 2016-2018, Arizona Board of Regents.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
+ */
+
+#include "rtt-estimator.hpp"
+
+#include <cmath>
+#include <limits>
+
+namespace ndn {
+namespace util {
+
+RttEstimator::RttEstimator(const Options& options)
+ : m_options(options)
+ , m_sRtt(std::numeric_limits<double>::quiet_NaN())
+ , m_rttVar(std::numeric_limits<double>::quiet_NaN())
+ , m_rto(m_options.initialRto.count())
+ , m_rttMin(std::numeric_limits<double>::max())
+ , m_rttMax(std::numeric_limits<double>::min())
+ , m_rttAvg(0.0)
+ , m_nRttSamples(0)
+{
+}
+
+void
+RttEstimator::addMeasurement(MillisecondsDouble rtt, size_t nExpectedSamples)
+{
+ BOOST_ASSERT(nExpectedSamples > 0);
+
+ if (m_nRttSamples == 0) { // first measurement
+ m_sRtt = rtt;
+ m_rttVar = m_sRtt / 2;
+ m_rto = m_sRtt + m_options.k * m_rttVar;
+ }
+ else {
+ double alpha = m_options.alpha / nExpectedSamples;
+ double beta = m_options.beta / nExpectedSamples;
+ m_rttVar = (1 - beta) * m_rttVar + beta * time::abs(m_sRtt - rtt);
+ m_sRtt = (1 - alpha) * m_sRtt + alpha * rtt;
+ m_rto = m_sRtt + m_options.k * m_rttVar;
+ }
+
+ m_rto = clamp(m_rto, m_options.minRto, m_options.maxRto);
+
+ m_rttAvg = MillisecondsDouble((m_nRttSamples * m_rttAvg.count() + rtt.count()) / (m_nRttSamples + 1));
+ m_rttMax = std::max<MillisecondsDouble>(rtt, m_rttMax);
+ m_rttMin = std::min<MillisecondsDouble>(rtt, m_rttMin);
+ m_nRttSamples++;
+}
+
+void
+RttEstimator::backoffRto()
+{
+ m_rto = clamp(m_rto * m_options.rtoBackoffMultiplier,
+ m_options.minRto, m_options.maxRto);
+}
+
+} // namespace util
+} // namespace ndn
diff --git a/src/util/rtt-estimator.hpp b/src/util/rtt-estimator.hpp
new file mode 100644
index 0000000..4cf5586
--- /dev/null
+++ b/src/util/rtt-estimator.hpp
@@ -0,0 +1,143 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (C) 2016-2018, Arizona Board of Regents.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
+ */
+
+#ifndef NDN_CXX_UTIL_RTT_ESTIMATOR_HPP
+#define NDN_CXX_UTIL_RTT_ESTIMATOR_HPP
+
+#include "../common.hpp"
+#include "signal.hpp"
+#include "time.hpp"
+
+namespace ndn {
+namespace util {
+
+/**
+ * @brief RTT Estimator.
+ *
+ * This class implements the "Mean-Deviation" RTT estimator, as discussed in RFC 6298,
+ * with the modifications to RTO calculation described in RFC 7323 Appendix G.
+ */
+class RttEstimator
+{
+public:
+ using MillisecondsDouble = time::duration<double, time::milliseconds::period>;
+
+public:
+ class Options
+ {
+ public:
+ constexpr
+ Options() noexcept
+ {
+ }
+
+ public:
+ double alpha = 0.125; ///< weight of exponential moving average for meanRtt
+ double beta = 0.25; ///< weight of exponential moving average for varRtt
+ int k = 4; ///< factor of RTT variation when calculating RTO
+ MillisecondsDouble initialRto = MillisecondsDouble(1000.0); ///< initial RTO value
+ MillisecondsDouble minRto = MillisecondsDouble(200.0); ///< lower bound of RTO
+ MillisecondsDouble maxRto = MillisecondsDouble(20000.0); ///< upper bound of RTO
+ int rtoBackoffMultiplier = 2;
+ };
+
+ /**
+ * @brief Create a RTT Estimator
+ *
+ * Configures the RTT Estimator with the default parameters if an instance of Options
+ * is not passed to the constructor.
+ */
+ explicit
+ RttEstimator(const Options& options = Options());
+
+ /**
+ * @brief Add a new RTT measurement to the estimator.
+ *
+ * @param rtt the sampled rtt
+ * @param nExpectedSamples number of expected samples, must be greater than 0.
+ * It should be set to current number of in-flight Interests. Please
+ * refer to Appendix G of RFC 7323 for details.
+ * @note Don't add RTT measurements for retransmissions
+ */
+ void
+ addMeasurement(MillisecondsDouble rtt, size_t nExpectedSamples);
+
+ /**
+ * @brief Returns the estimated RTO value
+ */
+ MillisecondsDouble
+ getEstimatedRto() const
+ {
+ return m_rto;
+ }
+
+ /**
+ * @brief Returns the minimum RTT observed
+ */
+ MillisecondsDouble
+ getMinRtt() const
+ {
+ return m_rttMin;
+ }
+
+ /**
+ * @brief Returns the maximum RTT observed
+ */
+ MillisecondsDouble
+ getMaxRtt() const
+ {
+ return m_rttMax;
+ }
+
+ /**
+ * @brief Returns the average RTT
+ */
+ MillisecondsDouble
+ getAvgRtt() const
+ {
+ return m_rttAvg;
+ }
+
+ /**
+ * @brief Backoff RTO by a factor of Options::rtoBackoffMultiplier
+ */
+ void
+ backoffRto();
+
+private:
+ const Options m_options;
+ MillisecondsDouble m_sRtt; ///< smoothed round-trip time
+ MillisecondsDouble m_rttVar; ///< round-trip time variation
+ MillisecondsDouble m_rto; ///< retransmission timeout
+ MillisecondsDouble m_rttMin;
+ MillisecondsDouble m_rttMax;
+ MillisecondsDouble m_rttAvg;
+ int64_t m_nRttSamples; ///< number of RTT samples
+};
+
+} // namespace util
+} // namespace ndn
+
+#endif // NDN_CXX_UTIL_RTT_ESTIMATOR_HPP
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index 0d53aaf..de177fd 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,8 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2018 Regents of the University of California.
+ * Copyright (c) 2013-2018, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -17,10 +19,15 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
*/
#include "segment-fetcher.hpp"
#include "../name-component.hpp"
+#include "../encoding/buffer-stream.hpp"
#include "../lp/nack.hpp"
#include "../lp/nack-header.hpp"
@@ -30,22 +37,59 @@
namespace ndn {
namespace util {
-const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+constexpr double SegmentFetcher::MIN_SSTHRESH;
-SegmentFetcher::SegmentFetcher(Face& face, security::v2::Validator& validator)
- : m_face(face)
+void
+SegmentFetcher::Options::validate()
+{
+ if (maxTimeout < 1_ms) {
+ BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
+ }
+
+ if (initCwnd < 1.0) {
+ BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
+ }
+
+ if (aiStep < 0.0) {
+ BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
+ }
+
+ if (mdCoef < 0.0 || mdCoef > 1.0) {
+ BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
+ }
+}
+
+SegmentFetcher::SegmentFetcher(Face& face,
+ security::v2::Validator& validator,
+ const SegmentFetcher::Options& options)
+ : m_options(options)
+ , m_face(face)
, m_scheduler(m_face.getIoService())
, m_validator(validator)
+ , m_rttEstimator(options.rttOptions)
+ , m_timeLastSegmentReceived(time::steady_clock::now())
+ , m_nextSegmentNum(0)
+ , m_cwnd(options.initCwnd)
+ , m_ssthresh(options.initSsthresh)
+ , m_nSegmentsInFlight(0)
+ , m_nSegments(0)
+ , m_highInterest(0)
+ , m_highData(0)
+ , m_recPoint(0)
+ , m_nReceived(0)
+ , m_nBytesReceived(0)
{
+ m_options.validate();
}
shared_ptr<SegmentFetcher>
SegmentFetcher::start(Face& face,
const Interest& baseInterest,
- security::v2::Validator& validator)
+ security::v2::Validator& validator,
+ const SegmentFetcher::Options& options)
{
- shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator));
- fetcher->fetchFirstSegment(baseInterest, fetcher);
+ shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
+ fetcher->fetchFirstSegment(baseInterest, false, fetcher);
return fetcher;
}
@@ -56,7 +100,12 @@
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback)
{
- shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator);
+ Options options;
+ options.useConstantCwnd = true;
+ options.useConstantInterestTimeout = true;
+ options.maxTimeout = baseInterest.getInterestLifetime();
+ options.interestLifetime = baseInterest.getInterestLifetime();
+ shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator, options);
fetcher->onComplete.connect(completeCallback);
fetcher->onError.connect(errorCallback);
return fetcher;
@@ -70,135 +119,412 @@
const ErrorCallback& errorCallback)
{
auto fetcher = fetch(face, baseInterest, *validator, completeCallback, errorCallback);
+ // Ensure lifetime of validator shared_ptr
fetcher->onComplete.connect([validator] (ConstBufferPtr) {});
return fetcher;
}
void
SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
+ bool isRetransmission,
shared_ptr<SegmentFetcher> self)
{
Interest interest(baseInterest);
interest.setChildSelector(1);
interest.setMustBeFresh(true);
+ interest.setInterestLifetime(m_options.interestLifetime);
+ if (isRetransmission) {
+ interest.refreshNonce();
+ }
- m_face.expressInterest(interest,
- bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, true, self),
- bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
- bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+ m_nSegmentsInFlight++;
+ auto pendingInterest = m_face.expressInterest(interest,
+ bind(&SegmentFetcher::afterSegmentReceivedCb,
+ this, _1, _2, self),
+ bind(&SegmentFetcher::afterNackReceivedCb,
+ this, _1, _2, self),
+ nullptr);
+ auto timeoutEvent =
+ m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
+ bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
+ if (isRetransmission) {
+ updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
+ }
+ else {
+ BOOST_ASSERT(m_pendingSegments.count(0) == 0);
+ m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
+ pendingInterest, timeoutEvent});
+ }
}
void
-SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
- uint64_t segmentNo,
- shared_ptr<SegmentFetcher> self)
+SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
{
- Interest interest(origInterest); // to preserve any selectors
- interest.refreshNonce();
- interest.setChildSelector(0);
- interest.setMustBeFresh(false);
- interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
- m_face.expressInterest(interest,
- bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, false, self),
- bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
- bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+ if (checkAllSegmentsReceived()) {
+ // All segments have been retrieved
+ finalizeFetch(self);
+ }
+
+ int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+
+ std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
+
+ while (availableWindowSize > 0) {
+ if (!m_retxQueue.empty()) {
+ auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
+ m_retxQueue.pop();
+ if (pendingSegmentIt == m_pendingSegments.end()) {
+ // Skip re-requesting this segment, since it was received after RTO timeout
+ continue;
+ }
+ BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
+ segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
+ }
+ else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
+ if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+ // Don't request a segment a second time if received in response to first "discovery" Interest
+ m_nextSegmentNum++;
+ continue;
+ }
+ segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
+ }
+ else {
+ break;
+ }
+ availableWindowSize--;
+ }
+
+ for (const auto& segment : segmentsToRequest) {
+ Interest interest(origInterest); // to preserve Interest elements
+ interest.refreshNonce();
+ interest.setChildSelector(0);
+ interest.setMustBeFresh(false);
+
+ Name interestName(m_versionedDataName);
+ interestName.appendSegment(segment.first);
+ interest.setName(interestName);
+ interest.setInterestLifetime(m_options.interestLifetime);
+ m_nSegmentsInFlight++;
+ auto pendingInterest = m_face.expressInterest(interest,
+ bind(&SegmentFetcher::afterSegmentReceivedCb,
+ this, _1, _2, self),
+ bind(&SegmentFetcher::afterNackReceivedCb,
+ this, _1, _2, self),
+ nullptr);
+ auto timeoutEvent =
+ m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
+ bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
+ if (segment.second) { // Retransmission
+ updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
+ }
+ else { // First request for segment
+ BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
+ m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
+ time::steady_clock::now(),
+ pendingInterest, timeoutEvent});
+ m_highInterest = segment.first;
+ }
+ }
}
void
SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
+ const Data& data,
shared_ptr<SegmentFetcher> self)
{
afterSegmentReceived(data);
- m_validator.validate(data,
- bind(&SegmentFetcher::afterValidationSuccess, this, _1,
- isSegmentZeroExpected, origInterest, self),
- bind(&SegmentFetcher::afterValidationFailure, this, _1, _2));
+ BOOST_ASSERT(m_nSegmentsInFlight > 0);
+ m_nSegmentsInFlight--;
+ name::Component currentSegmentComponent = data.getName().get(-1);
+ if (!currentSegmentComponent.isSegment()) {
+ return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
+ }
+
+ uint64_t currentSegment = currentSegmentComponent.toSegment();
+
+ // The first received Interest could have any segment ID
+ std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
+ if (m_receivedSegments.size() > 0) {
+ pendingSegmentIt = m_pendingSegments.find(currentSegment);
+ }
+ else {
+ pendingSegmentIt = m_pendingSegments.begin();
+ }
+
+ // Cancel timeout event
+ m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
+ pendingSegmentIt->second.timeoutEvent = nullptr;
+
+ m_validator.validate(data,
+ bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
+ pendingSegmentIt, self),
+ bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, self));
}
void
SegmentFetcher::afterValidationSuccess(const Data& data,
- bool isSegmentZeroExpected,
const Interest& origInterest,
+ std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
shared_ptr<SegmentFetcher> self)
{
- name::Component currentSegment = data.getName().get(-1);
+ // We update the last receive time here instead of in the segment received callback so that the
+ // transfer will not fail to terminate if we only received invalid Data packets.
+ m_timeLastSegmentReceived = time::steady_clock::now();
- if (currentSegment.isSegment()) {
- if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
- fetchNextSegment(origInterest, data.getName(), 0, self);
+ m_nReceived++;
+
+ // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
+ uint64_t currentSegment = data.getName().get(-1).toSegment();
+ // Add measurement to RTO estimator (if not retransmission)
+ if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
+ m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
+ std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
+ }
+
+ // Remove from pending segments map
+ m_pendingSegments.erase(pendingSegmentIt);
+
+ // Copy data in segment to temporary buffer
+ auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
+ std::forward_as_tuple(currentSegment),
+ std::forward_as_tuple(data.getContent().value_size()));
+ std::copy(data.getContent().value_begin(), data.getContent().value_end(),
+ receivedSegmentIt.first->second.begin());
+ m_nBytesReceived += data.getContent().value_size();
+ afterSegmentValidated(data);
+
+ if (data.getFinalBlock()) {
+ if (!data.getFinalBlock()->isSegment()) {
+ return signalError(FINALBLOCKID_NOT_SEGMENT,
+ "Received FinalBlockId did not contain a segment component");
+ }
+
+ if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
+ m_nSegments = data.getFinalBlock()->toSegment() + 1;
+ cancelExcessInFlightSegments();
+ }
+ }
+
+ if (m_receivedSegments.size() == 1) {
+ m_versionedDataName = data.getName().getPrefix(-1);
+ if (currentSegment == 0) {
+ // We received the first segment in response, so we can increment the next segment number
+ m_nextSegmentNum++;
+ }
+ }
+
+ if (m_highData < currentSegment) {
+ m_highData = currentSegment;
+ }
+
+ if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
+ windowDecrease();
+ }
+ else {
+ windowIncrease();
+ }
+
+ fetchSegmentsInWindow(origInterest, self);
+}
+
+void
+SegmentFetcher::afterValidationFailure(const Data& data,
+ const security::v2::ValidationError& error,
+ shared_ptr<SegmentFetcher> self)
+{
+ signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " +
+ boost::lexical_cast<std::string>(error));
+}
+
+
+void
+SegmentFetcher::afterNackReceivedCb(const Interest& origInterest,
+ const lp::Nack& nack,
+ shared_ptr<SegmentFetcher> self)
+{
+ afterSegmentNacked();
+ BOOST_ASSERT(m_nSegmentsInFlight > 0);
+ m_nSegmentsInFlight--;
+
+ switch (nack.getReason()) {
+ case lp::NackReason::DUPLICATE:
+ case lp::NackReason::CONGESTION:
+ afterNackOrTimeout(origInterest, self);
+ break;
+ default:
+ signalError(NACK_ERROR, "Nack Error");
+ break;
+ }
+}
+
+void
+SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self)
+{
+ afterSegmentTimedOut();
+ BOOST_ASSERT(m_nSegmentsInFlight > 0);
+ m_nSegmentsInFlight--;
+ afterNackOrTimeout(origInterest, self);
+}
+
+void
+SegmentFetcher::afterNackOrTimeout(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
+{
+ if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
+ // Fail transfer due to exceeding the maximum timeout between the succesful receipt of segments
+ return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
+ }
+
+ name::Component lastNameComponent = origInterest.getName().get(-1);
+ std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
+ BOOST_ASSERT(m_pendingSegments.size() > 0);
+ if (lastNameComponent.isSegment()) {
+ BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
+ pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
+ }
+ else { // First Interest
+ BOOST_ASSERT(m_pendingSegments.size() > 0);
+ pendingSegmentIt = m_pendingSegments.begin();
+ }
+
+ // Cancel timeout event and set status to InRetxQueue
+ m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
+ pendingSegmentIt->second.timeoutEvent = nullptr;
+ pendingSegmentIt->second.state = SegmentState::InRetxQueue;
+
+ m_rttEstimator.backoffRto();
+
+ if (m_receivedSegments.size() == 0) {
+ // Resend first Interest (until maximum receive timeout exceeded)
+ fetchFirstSegment(origInterest, true, self);
+ }
+ else {
+ windowDecrease();
+ m_retxQueue.push(pendingSegmentIt->first);
+ fetchSegmentsInWindow(origInterest, self);
+ }
+}
+
+void
+SegmentFetcher::finalizeFetch(shared_ptr<SegmentFetcher> self)
+{
+ // Combine segments into final buffer
+ OBufferStream buf;
+ // We may have received more segments than exist in the object.
+ BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
+
+ for (int64_t i = 0; i < m_nSegments; i++) {
+ buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+ }
+
+ onComplete(buf.buf());
+}
+
+void
+SegmentFetcher::windowIncrease()
+{
+ if (m_options.useConstantCwnd) {
+ BOOST_ASSERT(m_cwnd == m_options.initCwnd);
+ return;
+ }
+
+ if (m_cwnd < m_ssthresh) {
+ m_cwnd += m_options.aiStep; // additive increase
+ }
+ else {
+ m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
+ }
+}
+
+void
+SegmentFetcher::windowDecrease()
+{
+ if (m_options.disableCwa || m_highData > m_recPoint) {
+ m_recPoint = m_highInterest;
+
+ if (m_options.useConstantCwnd) {
+ BOOST_ASSERT(m_cwnd == m_options.initCwnd);
+ return;
+ }
+
+ // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
+ m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
+ m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
+ }
+}
+
+void
+SegmentFetcher::signalError(uint32_t code, const std::string& msg)
+{
+ // Cancel all pending Interests before signaling error
+ for (const auto& pendingSegment : m_pendingSegments) {
+ m_face.removePendingInterest(pendingSegment.second.id);
+ if (pendingSegment.second.timeoutEvent) {
+ m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
+ }
+ }
+ onError(code, msg);
+}
+
+void
+SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
+ const PendingInterestId* pendingInterest,
+ scheduler::EventId timeoutEvent)
+{
+ auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
+ BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
+ BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
+ pendingSegmentIt->second.state = SegmentState::Retransmitted;
+ pendingSegmentIt->second.id = pendingInterest;
+ pendingSegmentIt->second.timeoutEvent = timeoutEvent;
+}
+
+void
+SegmentFetcher::cancelExcessInFlightSegments()
+{
+ for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
+ if (it->first >= static_cast<uint64_t>(m_nSegments)) {
+ m_face.removePendingInterest(it->second.id);
+ if (it->second.timeoutEvent) {
+ m_scheduler.cancelEvent(it->second.timeoutEvent);
+ }
+ it = m_pendingSegments.erase(it);
+ BOOST_ASSERT(m_nSegmentsInFlight > 0);
+ m_nSegmentsInFlight--;
}
else {
- m_buffer.write(reinterpret_cast<const char*>(data.getContent().value()),
- data.getContent().value_size());
- afterSegmentValidated(data);
- const auto& finalBlockId = data.getFinalBlock();
- if (!finalBlockId || (*finalBlockId > currentSegment)) {
- fetchNextSegment(origInterest, data.getName(), currentSegment.toSegment() + 1, self);
- }
- else {
- onComplete(m_buffer.buf());
- }
- }
- }
- else {
- onError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
- }
-}
-
-void
-SegmentFetcher::afterValidationFailure(const Data& data, const security::v2::ValidationError& error)
-{
- onError(SEGMENT_VALIDATION_FAIL, "Segment validation fail " +
- boost::lexical_cast<std::string>(error));
-}
-
-
-void
-SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
- uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
-{
- if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
- onError(NACK_ERROR, "Nack Error");
- }
- else {
- switch (nack.getReason()) {
- case lp::NackReason::DUPLICATE:
- reExpressInterest(origInterest, reExpressCount, self);
- break;
- case lp::NackReason::CONGESTION:
- using ms = time::milliseconds;
- m_scheduler.scheduleEvent(ms(static_cast<ms::rep>(std::pow(2, reExpressCount + 1))),
- bind(&SegmentFetcher::reExpressInterest, this,
- origInterest, reExpressCount, self));
- break;
- default:
- onError(NACK_ERROR, "Nack Error");
- break;
+ ++it;
}
}
}
-void
-SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
- shared_ptr<SegmentFetcher> self)
+bool
+SegmentFetcher::checkAllSegmentsReceived()
{
- interest.refreshNonce();
- BOOST_ASSERT(interest.hasNonce());
+ bool haveReceivedAllSegments = false;
- bool isSegmentZeroExpected = true;
- if (!interest.getName().empty()) {
- name::Component lastComponent = interest.getName().get(-1);
- isSegmentZeroExpected = !lastComponent.isSegment();
+ if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
+ haveReceivedAllSegments = true;
+ // Verify that all segments in window have been received. If not, send Interests for missing segments.
+ for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
+ if (m_receivedSegments.count(i) == 0) {
+ m_retxQueue.push(i);
+ haveReceivedAllSegments = false;
+ }
+ }
}
- m_face.expressInterest(interest,
- bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2,
- isSegmentZeroExpected, self),
- bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2,
- ++reExpressCount, self),
- bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+ return haveReceivedAllSegments;
+}
+
+time::milliseconds
+SegmentFetcher::getEstimatedRto()
+{
+ // We don't want an Interest timeout greater than the maximum allowed timeout between the
+ // succesful receipt of segments
+ return std::min(m_options.maxTimeout,
+ time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
}
} // namespace util
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index 3b81cb4..e3df0c5 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -1,6 +1,8 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2018 Regents of the University of California.
+ * Copyright (c) 2013-2018, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -17,6 +19,10 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
*/
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -24,16 +30,18 @@
#include "../common.hpp"
#include "../face.hpp"
-#include "../encoding/buffer-stream.hpp"
#include "../security/v2/validator.hpp"
+#include "rtt-estimator.hpp"
#include "scheduler.hpp"
#include "signal.hpp"
+#include <queue>
+
namespace ndn {
namespace util {
/**
- * @brief Utility class to fetch latest version of the segmented data
+ * @brief Utility class to fetch the latest version of segmented data
*
* SegmentFetcher assumes that the data is named `/<prefix>/<version>/<segment>`,
* where:
@@ -61,25 +69,20 @@
*
* >> Interest: `/<prefix>/<version>/<segment=(N+1))>`
*
- * 6. Fire onCompletion callback with memory block that combines content part from all
- * segmented objects.
+ * 6. Signal `onComplete` with a memory block that combines the content of all segments of the
+ * object.
*
- * If an error occurs during the fetching process, an error callback is fired
- * with a proper error code. The following errors are possible:
+ * If an error occurs during the fetching process, `onError` is signaled with one of the error codes
+ * from `SegmentFetcher::ErrorCode`.
*
- * - `INTEREST_TIMEOUT`: if any of the Interests times out
- * - `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have segment
- * as a last component of the name (not counting implicit digest)
- * - `SEGMENT_VALIDATION_FAIL`: if any retrieved segment fails user-provided validation
- *
- * In order to validate individual segments, a Validator instance needs to be specified.
- * If the segment validation is successful, afterValidationSuccess callback is fired, otherwise
- * afterValidationFailure callback.
+ * A Validator instance must be specified to validate individual segments. Every time a segment has
+ * been successfully validated, `afterValidationSuccess` will be signaled. If a segment fails
+ * validation, `afterValidationFailure` will be signaled.
*
* Examples:
*
* void
- * afterFetchComplete(const ConstBufferPtr& data)
+ * afterFetchComplete(ConstBufferPtr data)
* {
* ...
* }
@@ -91,80 +94,113 @@
* }
*
* ...
- * SegmentFetcher::fetch(face, Interest("/data/prefix", 30_s),
- * validator,
- * bind(&afterFetchComplete, this, _1),
- * bind(&afterFetchError, this, _1, _2));
+ * auto fetcher = SegmentFetcher::start(face, Interest("/data/prefix"), validator);
+ * fetcher->onComplete.connect(bind(&afterFetchComplete, this, _1));
+ * fetcher->onError.connect(bind(&afterFetchError, this, _1, _2));
+ *
*
*/
class SegmentFetcher : noncopyable
{
public:
- /**
- * @brief Maximum number of times an interest will be reexpressed incase of NackCallback
- */
- static const uint32_t MAX_INTEREST_REEXPRESS;
-
+ // Deprecated: will be removed when deprecated fetch() API is removed - use start() instead
typedef function<void (ConstBufferPtr data)> CompleteCallback;
typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
/**
- * @brief Error codes that can be passed to ErrorCallback
+ * @brief Error codes passed to `onError`
*/
enum ErrorCode {
+ /// retrieval timed out because the maximum timeout between the successful receipt of segments was exceeded
INTEREST_TIMEOUT = 1,
+ /// one of the retrieved Data packets lacked a segment number in the last Name component (excl. implicit digest)
DATA_HAS_NO_SEGMENT = 2,
+ /// one of the retrieved segments failed user-provided validation
SEGMENT_VALIDATION_FAIL = 3,
- NACK_ERROR = 4
+ /// an unrecoverable Nack was received during retrieval
+ NACK_ERROR = 4,
+ /// a received FinalBlockId did not contain a segment component
+ FINALBLOCKID_NOT_SEGMENT = 5
+ };
+
+ class Options
+ {
+ public:
+ Options()
+ {
+ }
+
+ void
+ validate();
+
+ public:
+ bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+ bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+ time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+ time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+ double initCwnd = 1.0; ///< initial congestion window size
+ double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
+ double aiStep = 1.0; ///< additive increase step (in segments)
+ double mdCoef = 0.5; ///< multiplicative decrease coefficient
+ bool disableCwa = false; ///< disable Conservative Window Adaptation
+ bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+ bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
+ RttEstimator::Options rttOptions; ///< options for RTT estimator
};
/**
* @brief Initiates segment fetching
*
* @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
+ * @param baseInterest Interest for the initial segment of requested data.
* This interest may include a custom InterestLifetime and selectors that will
* propagate to all subsequent Interests. The only exception is that the
* initial Interest will be forced to include the "ChildSelector=1" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
+ * "MustBeFresh=true" selectors, which will not be included in subsequent
* Interests.
- * @param validator Reference to the Validator that should be used to validate data. Caller
- * must ensure validator is valid until either onComplete or onError has been
- * signaled.
+ * @param validator Reference to the Validator the fetcher will use to validate data.
+ * The caller must ensure the validator is valid until either `onComplete` or
+ * `onError` has been signaled.
+ * @param options Options controlling the transfer
*
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that the
+ * SegmentFetcher's signals can be connected to.
*
* Transfer completion, failure, and progress are indicated via signals.
*/
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
start(Face& face,
const Interest& baseInterest,
- security::v2::Validator& validator);
+ security::v2::Validator& validator,
+ const Options& options = Options());
/**
* @brief Initiates segment fetching
*
* @deprecated Use @c start
*
- * @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
- * This interest may include custom InterestLifetime and selectors that
- * will propagate to all subsequent Interests. The only exception is that
- * the initial Interest will be forced to include "ChildSelector=rightmost" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
- * Interests.
- * @param validator Reference to the Validator that should be used to validate data. Caller
- * must ensure validator is valid until either completeCallback or errorCallback
- * is invoked.
+ * @param face Reference to the Face that should be used to fetch data
+ * @param baseInterest An Interest for the initial segment of requested data.
+ * This interest may include custom InterestLifetime and selectors that
+ * will propagate to all subsequent Interests. The only exception is that
+ * the initial Interest will be forced to include
+ * "ChildSelector=rightmost" and "MustBeFresh=true" selectors, which will
+ * be turned off in subsequent Interests.
+ * @param validator Reference to the Validator that should be used to validate data. Caller
+ * must ensure validator is valid until either completeCallback or
+ * errorCallback is invoked.
+ * @param completeCallback Callback to be fired when all segments are fetched
+ * @param errorCallback Callback to be fired when an error occurs (@see Errors)
*
- * @param completeCallback Callback to be fired when all segments are fetched
- * @param errorCallback Callback to be fired when an error occurs (@see Errors)
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that
+ * the SegmentFetcher's signals can be connected to.
*/
[[deprecated("use SegmentFetcher::start instead")]]
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
fetch(Face& face,
const Interest& baseInterest,
security::v2::Validator& validator,
@@ -176,22 +212,25 @@
*
* @deprecated Use @c start
*
- * @param face Reference to the Face that should be used to fetch data
- * @param baseInterest An Interest for the initial segment of requested data.
- * This interest may include custom InterestLifetime and selectors that
- * will propagate to all subsequent Interests. The only exception is that
- * the initial Interest will be forced to include "ChildSelector=1" and
- * "MustBeFresh=true" selectors, which will be turned off in subsequent
- * Interests.
- * @param validator A shared_ptr to the Validator that should be used to validate data.
+ * @param face Reference to the Face that should be used to fetch data
+ * @param baseInterest An Interest for the initial segment of requested data.
+ * This interest may include custom InterestLifetime and selectors that
+ * will propagate to all subsequent Interests. The only exception is that
+ * the initial Interest will be forced to include "ChildSelector=1" and
+ * "MustBeFresh=true" selectors, which will be turned off in subsequent
+ * Interests.
+ * @param validator A shared_ptr to the Validator that should be used to validate data.
*
- * @param completeCallback Callback to be fired when all segments are fetched
- * @param errorCallback Callback to be fired when an error occurs (@see Errors)
- * @return A shared_ptr to the constructed SegmentFetcher
+ * @param completeCallback Callback to be fired when all segments are fetched
+ * @param errorCallback Callback to be fired when an error occurs (@see Errors)
+ *
+ * @return A shared_ptr to the constructed SegmentFetcher.
+ * This shared_ptr is kept internally for the lifetime of the transfer.
+ * Therefore, it does not need to be saved and is provided here so that
+ * the SegmentFetcher's signals can be connected to.
*/
[[deprecated("use SegmentFetcher::start instead")]]
- static
- shared_ptr<SegmentFetcher>
+ static shared_ptr<SegmentFetcher>
fetch(Face& face,
const Interest& baseInterest,
shared_ptr<security::v2::Validator> validator,
@@ -199,35 +238,71 @@
const ErrorCallback& errorCallback);
private:
- SegmentFetcher(Face& face, security::v2::Validator& validator);
+ class PendingSegment;
+
+ SegmentFetcher(Face& face, security::v2::Validator& validator, const Options& options);
void
- fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
+ fetchFirstSegment(const Interest& baseInterest,
+ bool isRetransmission,
+ shared_ptr<SegmentFetcher> self);
void
- fetchNextSegment(const Interest& origInterest, const Name& dataName, uint64_t segmentNo,
- shared_ptr<SegmentFetcher> self);
+ fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self);
void
afterSegmentReceivedCb(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
+ const Data& data,
shared_ptr<SegmentFetcher> self);
void
afterValidationSuccess(const Data& data,
- bool isSegmentZeroExpected,
const Interest& origInterest,
+ std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
shared_ptr<SegmentFetcher> self);
void
- afterValidationFailure(const Data& data, const security::v2::ValidationError& error);
+ afterValidationFailure(const Data& data,
+ const security::v2::ValidationError& error,
+ shared_ptr<SegmentFetcher> self);
void
- afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
- uint32_t reExpressCount, shared_ptr<SegmentFetcher> self);
+ afterNackReceivedCb(const Interest& origInterest,
+ const lp::Nack& nack,
+ shared_ptr<SegmentFetcher> self);
void
- reExpressInterest(Interest interest, uint32_t reExpressCount,
- shared_ptr<SegmentFetcher> self);
+ afterTimeoutCb(const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self);
+
+ void
+ afterNackOrTimeout(const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self);
+
+ void
+ finalizeFetch(shared_ptr<SegmentFetcher> self);
+
+ void
+ windowIncrease();
+
+ void
+ windowDecrease();
+
+ void
+ signalError(uint32_t code, const std::string& msg);
+
+ void
+ updateRetransmittedSegment(uint64_t segmentNum,
+ const PendingInterestId* pendingInterest,
+ scheduler::EventId timeoutEvent);
+
+ void
+ cancelExcessInFlightSegments();
+
+ bool
+ checkAllSegmentsReceived();
+
+ time::milliseconds
+ getEstimatedRto();
public:
/**
@@ -252,12 +327,58 @@
*/
Signal<SegmentFetcher, Data> afterSegmentValidated;
+ /**
+ * @brief Emits whenever an Interest for a data segment is nacked
+ */
+ Signal<SegmentFetcher> afterSegmentNacked;
+
+ /**
+ * @brief Emits whenever an Interest for a data segment times out
+ */
+ Signal<SegmentFetcher> afterSegmentTimedOut;
+
private:
+ enum class SegmentState {
+ FirstInterest, ///< the first Interest for this segment has been sent
+ InRetxQueue, ///< the segment is awaiting Interest retransmission
+ Retransmitted, ///< one or more retransmitted Interests have been sent for this segment
+ };
+
+ class PendingSegment
+ {
+ public:
+ SegmentState state;
+ time::steady_clock::TimePoint sendTime;
+ const PendingInterestId* id;
+ scheduler::EventId timeoutEvent;
+ };
+
+NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ static constexpr double MIN_SSTHRESH = 2.0;
+
+ Options m_options;
Face& m_face;
Scheduler m_scheduler;
security::v2::Validator& m_validator;
+ RttEstimator m_rttEstimator;
+ time::milliseconds m_timeout;
- OBufferStream m_buffer;
+ time::steady_clock::TimePoint m_timeLastSegmentReceived;
+ std::queue<uint64_t> m_retxQueue;
+ Name m_versionedDataName;
+ uint64_t m_nextSegmentNum;
+ double m_cwnd;
+ double m_ssthresh;
+ int64_t m_nSegmentsInFlight;
+ int64_t m_nSegments;
+ uint64_t m_highInterest;
+ uint64_t m_highData;
+ uint64_t m_recPoint;
+ int64_t m_nReceived;
+ int64_t m_nBytesReceived;
+
+ std::map<uint64_t, Buffer> m_receivedSegments;
+ std::map<uint64_t, PendingSegment> m_pendingSegments;
};
} // namespace util