util: add "in order" mode to SegmentFetcher
This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.
Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/ndn-cxx/util/segment-fetcher.cpp b/ndn-cxx/util/segment-fetcher.cpp
index 6e5d492..0da0860 100644
--- a/ndn-cxx/util/segment-fetcher.cpp
+++ b/ndn-cxx/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2019 Regents of the University of California,
+ * Copyright (c) 2013-2020 Regents of the University of California,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University.
*
@@ -19,10 +19,6 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
*/
#include "ndn-cxx/util/segment-fetcher.hpp"
@@ -71,16 +67,8 @@
, m_validator(validator)
, m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
, m_timeLastSegmentReceived(time::steady_clock::now())
- , m_nextSegmentNum(0)
, m_cwnd(options.initCwnd)
, m_ssthresh(options.initSsthresh)
- , m_nSegmentsInFlight(0)
- , m_nSegments(0)
- , m_highInterest(0)
- , m_highData(0)
- , m_recPoint(0)
- , m_nReceived(0)
- , m_nBytesReceived(0)
{
m_options.validate();
}
@@ -137,7 +125,15 @@
return finalizeFetch();
}
- int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+ int64_t availableWindowSize;
+ if (m_options.inOrder) {
+ availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
+ }
+ else {
+ availableWindowSize = static_cast<int64_t>(m_cwnd);
+ }
+ availableWindowSize -= m_nSegmentsInFlight;
+
std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
while (availableWindowSize > 0) {
@@ -152,7 +148,7 @@
segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
}
else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
- if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+ if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
// Don't request a segment a second time if received in response to first "discovery" Interest
m_nextSegmentNum++;
continue;
@@ -264,6 +260,8 @@
// It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
uint64_t currentSegment = data.getName().get(-1).toSegment();
+ m_receivedSegments.insert(currentSegment);
+
// Add measurement to RTO estimator (if not retransmission)
if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
BOOST_ASSERT(m_nSegmentsInFlight >= 0);
@@ -275,9 +273,9 @@
m_pendingSegments.erase(pendingSegmentIt);
// Copy data in segment to temporary buffer
- auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
- std::forward_as_tuple(currentSegment),
- std::forward_as_tuple(data.getContent().value_size()));
+ auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
+ std::forward_as_tuple(currentSegment),
+ std::forward_as_tuple(data.getContent().value_size()));
std::copy(data.getContent().value_begin(), data.getContent().value_end(),
receivedSegmentIt.first->second.begin());
m_nBytesReceived += data.getContent().value_size();
@@ -295,6 +293,13 @@
}
}
+ if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
+ do {
+ onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
+ m_segmentBuffer.erase(m_nextSegmentInOrder++);
+ } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
+ }
+
if (m_receivedSegments.size() == 1) {
m_versionedDataName = data.getName().getPrefix(-1);
if (currentSegment == 0) {
@@ -405,16 +410,20 @@
void
SegmentFetcher::finalizeFetch()
{
- // Combine segments into final buffer
- OBufferStream buf;
- // We may have received more segments than exist in the object.
- BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
-
- for (int64_t i = 0; i < m_nSegments; i++) {
- buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+ if (m_options.inOrder) {
+ onInOrderComplete();
}
+ else {
+ // Combine segments into final buffer
+ OBufferStream buf;
+ // We may have received more segments than exist in the object.
+ BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
- onComplete(buf.buf());
+ for (int64_t i = 0; i < m_nSegments; i++) {
+ buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
+ }
+ onComplete(buf.buf());
+ }
stop();
}
diff --git a/ndn-cxx/util/segment-fetcher.hpp b/ndn-cxx/util/segment-fetcher.hpp
index bf349d0..30879d6 100644
--- a/ndn-cxx/util/segment-fetcher.hpp
+++ b/ndn-cxx/util/segment-fetcher.hpp
@@ -19,10 +19,6 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
*/
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -35,6 +31,7 @@
#include "ndn-cxx/util/signal.hpp"
#include <queue>
+#include <set>
namespace ndn {
namespace util {
@@ -53,20 +50,21 @@
*
* 1. Express an Interest to discover the latest version of the object:
*
- * Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
+ * Interest: `/<prefix>?CanBePrefix&MustBeFresh`
*
- * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
+ * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`.
*
* 3. Keep sending Interests for future segments until an error occurs or the number of segments
* indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
* at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
* otherwise, retrieval will start at segment 0. By default, congestion control will be used to
* manage the Interest window size. Interests expressed in this step will follow this Name
- * format:
+ * format: `/<prefix>/<version>/<segment=(N)>`.
*
- * Interest: `/<prefix>/<version>/<segment=(N)>`
- *
- * 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
+ * 4. If set to 'block' mode, signal #onComplete passing a memory buffer that combines the content
+ * of all segments in the object. If set to 'in order' mode, signal #onInOrderData is triggered
+ * upon validation of each segment in segment order, storing later segments that arrived out of
+ * order internally until all earlier segments have arrived and have been validated.
*
* If an error occurs during the fetching process, #onError is signaled with one of the error codes
* from SegmentFetcher::ErrorCode.
@@ -124,18 +122,20 @@
validate();
public:
- bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
- bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
- time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+ time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+ bool inOrder = false; ///< true for 'in order' mode, false for 'block' mode
+ bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+ bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+ bool disableCwa = false; ///< disable Conservative Window Adaptation
+ bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+ bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
double initCwnd = 1.0; ///< initial congestion window size
double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
double aiStep = 1.0; ///< additive increase step (in segments)
double mdCoef = 0.5; ///< multiplicative decrease coefficient
- bool disableCwa = false; ///< disable Conservative Window Adaptation
- bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
- bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
RttEstimator::Options rttOptions; ///< options for RTT estimator
+ size_t flowControlWindow = 25000; ///< maximum number of segments stored in the reorder buffer
};
/**
@@ -244,37 +244,50 @@
public:
/**
- * @brief Emits upon successful retrieval of the complete data.
+ * @brief Emitted upon successful retrieval of the complete object (all segments).
+ * @note Emitted only if SegmentFetcher is operating in 'block' mode.
*/
Signal<SegmentFetcher, ConstBufferPtr> onComplete;
/**
- * @brief Emits when the retrieval could not be completed due to an error.
+ * @brief Emitted when the retrieval could not be completed due to an error.
*
* Handlers are provided with an error code and a string error message.
*/
Signal<SegmentFetcher, uint32_t, std::string> onError;
/**
- * @brief Emits whenever a data segment received.
+ * @brief Emitted whenever a data segment received.
*/
Signal<SegmentFetcher, Data> afterSegmentReceived;
/**
- * @brief Emits whenever a received data segment has been successfully validated.
+ * @brief Emitted whenever a received data segment has been successfully validated.
*/
Signal<SegmentFetcher, Data> afterSegmentValidated;
/**
- * @brief Emits whenever an Interest for a data segment is nacked.
+ * @brief Emitted whenever an Interest for a data segment is nacked.
*/
Signal<SegmentFetcher> afterSegmentNacked;
/**
- * @brief Emits whenever an Interest for a data segment times out.
+ * @brief Emitted whenever an Interest for a data segment times out.
*/
Signal<SegmentFetcher> afterSegmentTimedOut;
+ /**
+ * @brief Emitted after each data segment in segment order has been validated.
+ * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+ */
+ Signal<SegmentFetcher, ConstBufferPtr> onInOrderData;
+
+ /**
+ * @brief Emitted on successful retrieval of all segments in 'in order' mode.
+ * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+ */
+ Signal<SegmentFetcher> onInOrderComplete;
+
private:
enum class SegmentState {
FirstInterest, ///< the first Interest for this segment has been sent
@@ -305,19 +318,21 @@
time::steady_clock::TimePoint m_timeLastSegmentReceived;
std::queue<uint64_t> m_retxQueue;
Name m_versionedDataName;
- uint64_t m_nextSegmentNum;
+ uint64_t m_nextSegmentNum = 0;
double m_cwnd;
double m_ssthresh;
- int64_t m_nSegmentsInFlight;
- int64_t m_nSegments;
- uint64_t m_highInterest;
- uint64_t m_highData;
- uint64_t m_recPoint;
- int64_t m_nReceived;
- int64_t m_nBytesReceived;
+ int64_t m_nSegmentsInFlight = 0;
+ int64_t m_nSegments = 0;
+ uint64_t m_highInterest = 0;
+ uint64_t m_highData = 0;
+ uint64_t m_recPoint = 0;
+ int64_t m_nReceived = 0;
+ int64_t m_nBytesReceived = 0;
+ uint64_t m_nextSegmentInOrder = 0;
- std::map<uint64_t, Buffer> m_receivedSegments;
+ std::map<uint64_t, Buffer> m_segmentBuffer;
std::map<uint64_t, PendingSegment> m_pendingSegments;
+ std::set<uint64_t> m_receivedSegments;
};
} // namespace util