util: augment SegmentFetcher with congestion control

refs #4364

Change-Id: Ibfebc1f173382d1faf88e0d3d707b8ab2dae5e43
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index 0d53aaf..de177fd 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,8 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2013-2018 Regents of the University of California.
+ * Copyright (c) 2013-2018, Regents of the University of California,
+ *                          Colorado State University,
+ *                          University Pierre & Marie Curie, Sorbonne University.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -17,10 +19,15 @@
  * <http://www.gnu.org/licenses/>.
  *
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
  */
 
 #include "segment-fetcher.hpp"
 #include "../name-component.hpp"
+#include "../encoding/buffer-stream.hpp"
 #include "../lp/nack.hpp"
 #include "../lp/nack-header.hpp"
 
@@ -30,22 +37,59 @@
 namespace ndn {
 namespace util {
 
-const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+constexpr double SegmentFetcher::MIN_SSTHRESH;
 
-SegmentFetcher::SegmentFetcher(Face& face, security::v2::Validator& validator)
-  : m_face(face)
+void
+SegmentFetcher::Options::validate()
+{
+  if (maxTimeout < 1_ms) {
+    BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
+  }
+
+  if (initCwnd < 1.0) {
+    BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
+  }
+
+  if (aiStep < 0.0) {
+    BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
+  }
+
+  if (mdCoef < 0.0 || mdCoef > 1.0) {
+    BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
+  }
+}
+
+SegmentFetcher::SegmentFetcher(Face& face,
+                               security::v2::Validator& validator,
+                               const SegmentFetcher::Options& options)
+  : m_options(options)
+  , m_face(face)
   , m_scheduler(m_face.getIoService())
   , m_validator(validator)
+  , m_rttEstimator(options.rttOptions)
+  , m_timeLastSegmentReceived(time::steady_clock::now())
+  , m_nextSegmentNum(0)
+  , m_cwnd(options.initCwnd)
+  , m_ssthresh(options.initSsthresh)
+  , m_nSegmentsInFlight(0)
+  , m_nSegments(0)
+  , m_highInterest(0)
+  , m_highData(0)
+  , m_recPoint(0)
+  , m_nReceived(0)
+  , m_nBytesReceived(0)
 {
+  m_options.validate();
 }
 
 shared_ptr<SegmentFetcher>
 SegmentFetcher::start(Face& face,
                       const Interest& baseInterest,
-                      security::v2::Validator& validator)
+                      security::v2::Validator& validator,
+                      const SegmentFetcher::Options& options)
 {
-  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator));
-  fetcher->fetchFirstSegment(baseInterest, fetcher);
+  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
+  fetcher->fetchFirstSegment(baseInterest, false, fetcher);
   return fetcher;
 }
 
@@ -56,7 +100,12 @@
                       const CompleteCallback& completeCallback,
                       const ErrorCallback& errorCallback)
 {
-  shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator);
+  Options options;
+  options.useConstantCwnd = true;
+  options.useConstantInterestTimeout = true;
+  options.maxTimeout = baseInterest.getInterestLifetime();
+  options.interestLifetime = baseInterest.getInterestLifetime();
+  shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator, options);
   fetcher->onComplete.connect(completeCallback);
   fetcher->onError.connect(errorCallback);
   return fetcher;
@@ -70,135 +119,412 @@
                       const ErrorCallback& errorCallback)
 {
   auto fetcher = fetch(face, baseInterest, *validator, completeCallback, errorCallback);
+  // Ensure lifetime of validator shared_ptr
   fetcher->onComplete.connect([validator] (ConstBufferPtr) {});
   return fetcher;
 }
 
 void
 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
+                                  bool isRetransmission,
                                   shared_ptr<SegmentFetcher> self)
 {
   Interest interest(baseInterest);
   interest.setChildSelector(1);
   interest.setMustBeFresh(true);
+  interest.setInterestLifetime(m_options.interestLifetime);
+  if (isRetransmission) {
+    interest.refreshNonce();
+  }
 
-  m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, true, self),
-                         bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
-                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+  m_nSegmentsInFlight++;
+  auto pendingInterest = m_face.expressInterest(interest,
+                                                bind(&SegmentFetcher::afterSegmentReceivedCb,
+                                                     this, _1, _2, self),
+                                                bind(&SegmentFetcher::afterNackReceivedCb,
+                                                     this, _1, _2, self),
+                                                nullptr);
+  auto timeoutEvent =
+    m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
+                              bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
+  if (isRetransmission) {
+    updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
+  }
+  else {
+    BOOST_ASSERT(m_pendingSegments.count(0) == 0);
+    m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
+                                                pendingInterest, timeoutEvent});
+  }
 }
 
 void
-SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
-                                 uint64_t segmentNo,
-                                 shared_ptr<SegmentFetcher> self)
+SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
 {
-  Interest interest(origInterest); // to preserve any selectors
-  interest.refreshNonce();
-  interest.setChildSelector(0);
-  interest.setMustBeFresh(false);
-  interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
-  m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, false, self),
-                         bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
-                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+  if (checkAllSegmentsReceived()) {
+    // All segments have been retrieved
+    finalizeFetch(self);
+  }
+
+  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+
+  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
+
+  while (availableWindowSize > 0) {
+    if (!m_retxQueue.empty()) {
+      auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
+      m_retxQueue.pop();
+      if (pendingSegmentIt == m_pendingSegments.end()) {
+        // Skip re-requesting this segment, since it was received after RTO timeout
+        continue;
+      }
+      BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
+      segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
+    }
+    else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
+      if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+        // Don't request a segment a second time if received in response to first "discovery" Interest
+        m_nextSegmentNum++;
+        continue;
+      }
+      segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
+    }
+    else {
+      break;
+    }
+    availableWindowSize--;
+  }
+
+  for (const auto& segment : segmentsToRequest) {
+    Interest interest(origInterest); // to preserve Interest elements
+    interest.refreshNonce();
+    interest.setChildSelector(0);
+    interest.setMustBeFresh(false);
+
+    Name interestName(m_versionedDataName);
+    interestName.appendSegment(segment.first);
+    interest.setName(interestName);
+    interest.setInterestLifetime(m_options.interestLifetime);
+    m_nSegmentsInFlight++;
+    auto pendingInterest = m_face.expressInterest(interest,
+                                                  bind(&SegmentFetcher::afterSegmentReceivedCb,
+                                                       this, _1, _2, self),
+                                                  bind(&SegmentFetcher::afterNackReceivedCb,
+                                                       this, _1, _2, self),
+                                                  nullptr);
+    auto timeoutEvent =
+      m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
+                                bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
+    if (segment.second) { // Retransmission
+      updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
+    }
+    else { // First request for segment
+      BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
+      m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
+                                                              time::steady_clock::now(),
+                                                              pendingInterest, timeoutEvent});
+      m_highInterest = segment.first;
+    }
+  }
 }
 
 void
 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest,
-                                       const Data& data, bool isSegmentZeroExpected,
+                                       const Data& data,
                                        shared_ptr<SegmentFetcher> self)
 {
   afterSegmentReceived(data);
-  m_validator.validate(data,
-                       bind(&SegmentFetcher::afterValidationSuccess, this, _1,
-                            isSegmentZeroExpected, origInterest, self),
-                       bind(&SegmentFetcher::afterValidationFailure, this, _1, _2));
+  BOOST_ASSERT(m_nSegmentsInFlight > 0);
+  m_nSegmentsInFlight--;
 
+  name::Component currentSegmentComponent = data.getName().get(-1);
+  if (!currentSegmentComponent.isSegment()) {
+    return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
+  }
+
+  uint64_t currentSegment = currentSegmentComponent.toSegment();
+
+  // The first received Interest could have any segment ID
+  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
+  if (m_receivedSegments.size() > 0) {
+    pendingSegmentIt = m_pendingSegments.find(currentSegment);
+  }
+  else {
+    pendingSegmentIt = m_pendingSegments.begin();
+  }
+
+  // Cancel timeout event
+  m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
+  pendingSegmentIt->second.timeoutEvent = nullptr;
+
+  m_validator.validate(data,
+                       bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
+                            pendingSegmentIt, self),
+                       bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, self));
 }
 
 void
 SegmentFetcher::afterValidationSuccess(const Data& data,
-                                       bool isSegmentZeroExpected,
                                        const Interest& origInterest,
+                                       std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
                                        shared_ptr<SegmentFetcher> self)
 {
-  name::Component currentSegment = data.getName().get(-1);
+  // We update the last receive time here instead of in the segment received callback so that the
+  // transfer will not fail to terminate if we only received invalid Data packets.
+  m_timeLastSegmentReceived = time::steady_clock::now();
 
-  if (currentSegment.isSegment()) {
-    if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
-      fetchNextSegment(origInterest, data.getName(), 0, self);
+  m_nReceived++;
+
+  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
+  uint64_t currentSegment = data.getName().get(-1).toSegment();
+  // Add measurement to RTO estimator (if not retransmission)
+  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
+    m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
+                                  std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
+  }
+
+  // Remove from pending segments map
+  m_pendingSegments.erase(pendingSegmentIt);
+
+  // Copy data in segment to temporary buffer
+  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
+                                                      std::forward_as_tuple(currentSegment),
+                                                      std::forward_as_tuple(data.getContent().value_size()));
+  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
+            receivedSegmentIt.first->second.begin());
+  m_nBytesReceived += data.getContent().value_size();
+  afterSegmentValidated(data);
+
+  if (data.getFinalBlock()) {
+    if (!data.getFinalBlock()->isSegment()) {
+      return signalError(FINALBLOCKID_NOT_SEGMENT,
+                         "Received FinalBlockId did not contain a segment component");
+    }
+
+    if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
+      m_nSegments = data.getFinalBlock()->toSegment() + 1;
+      cancelExcessInFlightSegments();
+    }
+  }
+
+  if (m_receivedSegments.size() == 1) {
+    m_versionedDataName = data.getName().getPrefix(-1);
+    if (currentSegment == 0) {
+      // We received the first segment in response, so we can increment the next segment number
+      m_nextSegmentNum++;
+    }
+  }
+
+  if (m_highData < currentSegment) {
+    m_highData = currentSegment;
+  }
+
+  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
+    windowDecrease();
+  }
+  else {
+    windowIncrease();
+  }
+
+  fetchSegmentsInWindow(origInterest, self);
+}
+
+void
+SegmentFetcher::afterValidationFailure(const Data& data,
+                                       const security::v2::ValidationError& error,
+                                       shared_ptr<SegmentFetcher> self)
+{
+  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " +
+                                       boost::lexical_cast<std::string>(error));
+}
+
+
+void
+SegmentFetcher::afterNackReceivedCb(const Interest& origInterest,
+                                    const lp::Nack& nack,
+                                    shared_ptr<SegmentFetcher> self)
+{
+  afterSegmentNacked();
+  BOOST_ASSERT(m_nSegmentsInFlight > 0);
+  m_nSegmentsInFlight--;
+
+  switch (nack.getReason()) {
+    case lp::NackReason::DUPLICATE:
+    case lp::NackReason::CONGESTION:
+      afterNackOrTimeout(origInterest, self);
+      break;
+    default:
+      signalError(NACK_ERROR, "Nack Error");
+      break;
+  }
+}
+
+void
+SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
+                               shared_ptr<SegmentFetcher> self)
+{
+  afterSegmentTimedOut();
+  BOOST_ASSERT(m_nSegmentsInFlight > 0);
+  m_nSegmentsInFlight--;
+  afterNackOrTimeout(origInterest, self);
+}
+
+void
+SegmentFetcher::afterNackOrTimeout(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
+{
+  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
+    // Fail transfer due to exceeding the maximum timeout between the succesful receipt of segments
+    return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
+  }
+
+  name::Component lastNameComponent = origInterest.getName().get(-1);
+  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
+  BOOST_ASSERT(m_pendingSegments.size() > 0);
+  if (lastNameComponent.isSegment()) {
+    BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
+    pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
+  }
+  else { // First Interest
+    BOOST_ASSERT(m_pendingSegments.size() > 0);
+    pendingSegmentIt = m_pendingSegments.begin();
+  }
+
+  // Cancel timeout event and set status to InRetxQueue
+  m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
+  pendingSegmentIt->second.timeoutEvent = nullptr;
+  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
+
+  m_rttEstimator.backoffRto();
+
+  if (m_receivedSegments.size() == 0) {
+    // Resend first Interest (until maximum receive timeout exceeded)
+    fetchFirstSegment(origInterest, true, self);
+  }
+  else {
+    windowDecrease();
+    m_retxQueue.push(pendingSegmentIt->first);
+    fetchSegmentsInWindow(origInterest, self);
+  }
+}
+
+void
+SegmentFetcher::finalizeFetch(shared_ptr<SegmentFetcher> self)
+{
+  // Combine segments into final buffer
+  OBufferStream buf;
+  // We may have received more segments than exist in the object.
+  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
+
+  for (int64_t i = 0; i < m_nSegments; i++) {
+    buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+  }
+
+  onComplete(buf.buf());
+}
+
+void
+SegmentFetcher::windowIncrease()
+{
+  if (m_options.useConstantCwnd) {
+    BOOST_ASSERT(m_cwnd == m_options.initCwnd);
+    return;
+  }
+
+  if (m_cwnd < m_ssthresh) {
+    m_cwnd += m_options.aiStep; // additive increase
+  }
+  else {
+    m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
+  }
+}
+
+void
+SegmentFetcher::windowDecrease()
+{
+  if (m_options.disableCwa || m_highData > m_recPoint) {
+    m_recPoint = m_highInterest;
+
+    if (m_options.useConstantCwnd) {
+      BOOST_ASSERT(m_cwnd == m_options.initCwnd);
+      return;
+    }
+
+    // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
+    m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
+    m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
+  }
+}
+
+void
+SegmentFetcher::signalError(uint32_t code, const std::string& msg)
+{
+  // Cancel all pending Interests before signaling error
+  for (const auto& pendingSegment : m_pendingSegments) {
+    m_face.removePendingInterest(pendingSegment.second.id);
+    if (pendingSegment.second.timeoutEvent) {
+      m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
+    }
+  }
+  onError(code, msg);
+}
+
+void
+SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
+                                           const PendingInterestId* pendingInterest,
+                                           scheduler::EventId timeoutEvent)
+{
+  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
+  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
+  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
+  pendingSegmentIt->second.state = SegmentState::Retransmitted;
+  pendingSegmentIt->second.id = pendingInterest;
+  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
+}
+
+void
+SegmentFetcher::cancelExcessInFlightSegments()
+{
+  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
+    if (it->first >= static_cast<uint64_t>(m_nSegments)) {
+      m_face.removePendingInterest(it->second.id);
+      if (it->second.timeoutEvent) {
+        m_scheduler.cancelEvent(it->second.timeoutEvent);
+      }
+      it = m_pendingSegments.erase(it);
+      BOOST_ASSERT(m_nSegmentsInFlight > 0);
+      m_nSegmentsInFlight--;
     }
     else {
-      m_buffer.write(reinterpret_cast<const char*>(data.getContent().value()),
-                     data.getContent().value_size());
-      afterSegmentValidated(data);
-      const auto& finalBlockId = data.getFinalBlock();
-      if (!finalBlockId || (*finalBlockId > currentSegment)) {
-        fetchNextSegment(origInterest, data.getName(), currentSegment.toSegment() + 1, self);
-      }
-      else {
-        onComplete(m_buffer.buf());
-      }
-    }
-  }
-  else {
-    onError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
-  }
-}
-
-void
-SegmentFetcher::afterValidationFailure(const Data& data, const security::v2::ValidationError& error)
-{
-  onError(SEGMENT_VALIDATION_FAIL, "Segment validation fail " +
-                                   boost::lexical_cast<std::string>(error));
-}
-
-
-void
-SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
-                                    uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
-{
-  if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
-    onError(NACK_ERROR, "Nack Error");
-  }
-  else {
-    switch (nack.getReason()) {
-      case lp::NackReason::DUPLICATE:
-        reExpressInterest(origInterest, reExpressCount, self);
-        break;
-      case lp::NackReason::CONGESTION:
-        using ms = time::milliseconds;
-        m_scheduler.scheduleEvent(ms(static_cast<ms::rep>(std::pow(2, reExpressCount + 1))),
-                                  bind(&SegmentFetcher::reExpressInterest, this,
-                                       origInterest, reExpressCount, self));
-        break;
-      default:
-        onError(NACK_ERROR, "Nack Error");
-        break;
+      ++it;
     }
   }
 }
 
-void
-SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
-                                  shared_ptr<SegmentFetcher> self)
+bool
+SegmentFetcher::checkAllSegmentsReceived()
 {
-  interest.refreshNonce();
-  BOOST_ASSERT(interest.hasNonce());
+  bool haveReceivedAllSegments = false;
 
-  bool isSegmentZeroExpected = true;
-  if (!interest.getName().empty()) {
-    name::Component lastComponent = interest.getName().get(-1);
-    isSegmentZeroExpected = !lastComponent.isSegment();
+  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
+    haveReceivedAllSegments = true;
+    // Verify that all segments in window have been received. If not, send Interests for missing segments.
+    for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
+      if (m_receivedSegments.count(i) == 0) {
+        m_retxQueue.push(i);
+        haveReceivedAllSegments = false;
+      }
+    }
   }
 
-  m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2,
-                              isSegmentZeroExpected, self),
-                         bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2,
-                              ++reExpressCount, self),
-                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
+  return haveReceivedAllSegments;
+}
+
+time::milliseconds
+SegmentFetcher::getEstimatedRto()
+{
+  // We don't want an Interest timeout greater than the maximum allowed timeout between the
+  // succesful receipt of segments
+  return std::min(m_options.maxTimeout,
+                  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
 }
 
 } // namespace util