util: add "in order" mode to SegmentFetcher
This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.
Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/AUTHORS.md b/AUTHORS.md
index 261d9b6..13919c0 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -10,6 +10,7 @@
* Hila Ben Abraham <https://sites.wustl.edu/hbabraham>
* Shuo Chen <https://github.com/chenatu>
* Muktadir R. Chowdhury <https://github.com/alvyC>
+* Jeremy Clark <https://github.com/jrandallclark>
* Damian Coomes <https://github.com/dmcoomes>
* Stephanie DiBenedetto
* Qiuhan Ding <https://irl.cs.ucla.edu/~qiuhanding>
diff --git a/ndn-cxx/util/segment-fetcher.cpp b/ndn-cxx/util/segment-fetcher.cpp
index 6e5d492..0da0860 100644
--- a/ndn-cxx/util/segment-fetcher.cpp
+++ b/ndn-cxx/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2019 Regents of the University of California,
+ * Copyright (c) 2013-2020 Regents of the University of California,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University.
*
@@ -19,10 +19,6 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
*/
#include "ndn-cxx/util/segment-fetcher.hpp"
@@ -71,16 +67,8 @@
, m_validator(validator)
, m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
, m_timeLastSegmentReceived(time::steady_clock::now())
- , m_nextSegmentNum(0)
, m_cwnd(options.initCwnd)
, m_ssthresh(options.initSsthresh)
- , m_nSegmentsInFlight(0)
- , m_nSegments(0)
- , m_highInterest(0)
- , m_highData(0)
- , m_recPoint(0)
- , m_nReceived(0)
- , m_nBytesReceived(0)
{
m_options.validate();
}
@@ -137,7 +125,15 @@
return finalizeFetch();
}
- int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
+ int64_t availableWindowSize;
+ if (m_options.inOrder) {
+ availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
+ }
+ else {
+ availableWindowSize = static_cast<int64_t>(m_cwnd);
+ }
+ availableWindowSize -= m_nSegmentsInFlight;
+
std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
while (availableWindowSize > 0) {
@@ -152,7 +148,7 @@
segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
}
else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
- if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
+ if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
// Don't request a segment a second time if received in response to first "discovery" Interest
m_nextSegmentNum++;
continue;
@@ -264,6 +260,8 @@
// It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
uint64_t currentSegment = data.getName().get(-1).toSegment();
+ m_receivedSegments.insert(currentSegment);
+
// Add measurement to RTO estimator (if not retransmission)
if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
BOOST_ASSERT(m_nSegmentsInFlight >= 0);
@@ -275,9 +273,9 @@
m_pendingSegments.erase(pendingSegmentIt);
// Copy data in segment to temporary buffer
- auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
- std::forward_as_tuple(currentSegment),
- std::forward_as_tuple(data.getContent().value_size()));
+ auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
+ std::forward_as_tuple(currentSegment),
+ std::forward_as_tuple(data.getContent().value_size()));
std::copy(data.getContent().value_begin(), data.getContent().value_end(),
receivedSegmentIt.first->second.begin());
m_nBytesReceived += data.getContent().value_size();
@@ -295,6 +293,13 @@
}
}
+ if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
+ do {
+ onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
+ m_segmentBuffer.erase(m_nextSegmentInOrder++);
+ } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
+ }
+
if (m_receivedSegments.size() == 1) {
m_versionedDataName = data.getName().getPrefix(-1);
if (currentSegment == 0) {
@@ -405,16 +410,20 @@
void
SegmentFetcher::finalizeFetch()
{
- // Combine segments into final buffer
- OBufferStream buf;
- // We may have received more segments than exist in the object.
- BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
-
- for (int64_t i = 0; i < m_nSegments; i++) {
- buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
+ if (m_options.inOrder) {
+ onInOrderComplete();
}
+ else {
+ // Combine segments into final buffer
+ OBufferStream buf;
+ // We may have received more segments than exist in the object.
+ BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
- onComplete(buf.buf());
+ for (int64_t i = 0; i < m_nSegments; i++) {
+ buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
+ }
+ onComplete(buf.buf());
+ }
stop();
}
diff --git a/ndn-cxx/util/segment-fetcher.hpp b/ndn-cxx/util/segment-fetcher.hpp
index bf349d0..30879d6 100644
--- a/ndn-cxx/util/segment-fetcher.hpp
+++ b/ndn-cxx/util/segment-fetcher.hpp
@@ -19,10 +19,6 @@
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- *
- * @author Shuo Yang
- * @author Weiwei Liu
- * @author Chavoosh Ghasemi
*/
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
@@ -35,6 +31,7 @@
#include "ndn-cxx/util/signal.hpp"
#include <queue>
+#include <set>
namespace ndn {
namespace util {
@@ -53,20 +50,21 @@
*
* 1. Express an Interest to discover the latest version of the object:
*
- * Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
+ * Interest: `/<prefix>?CanBePrefix&MustBeFresh`
*
- * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
+ * 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`.
*
* 3. Keep sending Interests for future segments until an error occurs or the number of segments
* indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
* at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
* otherwise, retrieval will start at segment 0. By default, congestion control will be used to
* manage the Interest window size. Interests expressed in this step will follow this Name
- * format:
+ * format: `/<prefix>/<version>/<segment=(N)>`.
*
- * Interest: `/<prefix>/<version>/<segment=(N)>`
- *
- * 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
+ * 4. If set to 'block' mode, signal #onComplete passing a memory buffer that combines the content
+ * of all segments in the object. If set to 'in order' mode, signal #onInOrderData is triggered
+ * upon validation of each segment in segment order, storing later segments that arrived out of
+ * order internally until all earlier segments have arrived and have been validated.
*
* If an error occurs during the fetching process, #onError is signaled with one of the error codes
* from SegmentFetcher::ErrorCode.
@@ -124,18 +122,20 @@
validate();
public:
- bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
- bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
- time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
+ time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
+ bool inOrder = false; ///< true for 'in order' mode, false for 'block' mode
+ bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
+ bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
+ bool disableCwa = false; ///< disable Conservative Window Adaptation
+ bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+ bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
double initCwnd = 1.0; ///< initial congestion window size
double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
double aiStep = 1.0; ///< additive increase step (in segments)
double mdCoef = 0.5; ///< multiplicative decrease coefficient
- bool disableCwa = false; ///< disable Conservative Window Adaptation
- bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
- bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
RttEstimator::Options rttOptions; ///< options for RTT estimator
+ size_t flowControlWindow = 25000; ///< maximum number of segments stored in the reorder buffer
};
/**
@@ -244,37 +244,50 @@
public:
/**
- * @brief Emits upon successful retrieval of the complete data.
+ * @brief Emitted upon successful retrieval of the complete object (all segments).
+ * @note Emitted only if SegmentFetcher is operating in 'block' mode.
*/
Signal<SegmentFetcher, ConstBufferPtr> onComplete;
/**
- * @brief Emits when the retrieval could not be completed due to an error.
+ * @brief Emitted when the retrieval could not be completed due to an error.
*
* Handlers are provided with an error code and a string error message.
*/
Signal<SegmentFetcher, uint32_t, std::string> onError;
/**
- * @brief Emits whenever a data segment received.
+ * @brief Emitted whenever a data segment received.
*/
Signal<SegmentFetcher, Data> afterSegmentReceived;
/**
- * @brief Emits whenever a received data segment has been successfully validated.
+ * @brief Emitted whenever a received data segment has been successfully validated.
*/
Signal<SegmentFetcher, Data> afterSegmentValidated;
/**
- * @brief Emits whenever an Interest for a data segment is nacked.
+ * @brief Emitted whenever an Interest for a data segment is nacked.
*/
Signal<SegmentFetcher> afterSegmentNacked;
/**
- * @brief Emits whenever an Interest for a data segment times out.
+ * @brief Emitted whenever an Interest for a data segment times out.
*/
Signal<SegmentFetcher> afterSegmentTimedOut;
+ /**
+ * @brief Emitted after each data segment in segment order has been validated.
+ * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+ */
+ Signal<SegmentFetcher, ConstBufferPtr> onInOrderData;
+
+ /**
+ * @brief Emitted on successful retrieval of all segments in 'in order' mode.
+ * @note Emitted only if SegmentFetcher is operating in 'in order' mode.
+ */
+ Signal<SegmentFetcher> onInOrderComplete;
+
private:
enum class SegmentState {
FirstInterest, ///< the first Interest for this segment has been sent
@@ -305,19 +318,21 @@
time::steady_clock::TimePoint m_timeLastSegmentReceived;
std::queue<uint64_t> m_retxQueue;
Name m_versionedDataName;
- uint64_t m_nextSegmentNum;
+ uint64_t m_nextSegmentNum = 0;
double m_cwnd;
double m_ssthresh;
- int64_t m_nSegmentsInFlight;
- int64_t m_nSegments;
- uint64_t m_highInterest;
- uint64_t m_highData;
- uint64_t m_recPoint;
- int64_t m_nReceived;
- int64_t m_nBytesReceived;
+ int64_t m_nSegmentsInFlight = 0;
+ int64_t m_nSegments = 0;
+ uint64_t m_highInterest = 0;
+ uint64_t m_highData = 0;
+ uint64_t m_recPoint = 0;
+ int64_t m_nReceived = 0;
+ int64_t m_nBytesReceived = 0;
+ uint64_t m_nextSegmentInOrder = 0;
- std::map<uint64_t, Buffer> m_receivedSegments;
+ std::map<uint64_t, Buffer> m_segmentBuffer;
std::map<uint64_t, PendingSegment> m_pendingSegments;
+ std::set<uint64_t> m_receivedSegments;
};
} // namespace util
diff --git a/tests/unit/util/segment-fetcher.t.cpp b/tests/unit/util/segment-fetcher.t.cpp
index 9f65f49..7f63db8 100644
--- a/tests/unit/util/segment-fetcher.t.cpp
+++ b/tests/unit/util/segment-fetcher.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2019 Regents of the University of California.
+ * Copyright (c) 2013-2020 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -73,7 +73,20 @@
{
++nCompletions;
dataSize = data->size();
- dataBuf = data;
+ }
+
+ void
+ onInOrderComplete()
+ {
+ ++nCompletions;
+ ++nOnInOrderComplete;
+ }
+
+ void
+ onInOrderData(ConstBufferPtr data)
+ {
+ ++nOnInOrderData;
+ dataSize += data->size();
}
void
@@ -88,8 +101,15 @@
void
connectSignals(const shared_ptr<SegmentFetcher>& fetcher)
{
+ fetcher->onInOrderData.connect(bind(&Fixture::onInOrderData, this, _1));
+ fetcher->onInOrderComplete.connect(bind(&Fixture::onInOrderComplete, this));
fetcher->onComplete.connect(bind(&Fixture::onComplete, this, _1));
fetcher->onError.connect(bind(&Fixture::onError, this, _1));
+
+ fetcher->afterSegmentReceived.connect([this] (const auto&) { ++this->nAfterSegmentReceived; });
+ fetcher->afterSegmentValidated.connect([this] (const auto &) { ++this->nAfterSegmentValidated; });
+ fetcher->afterSegmentNacked.connect([this] { ++nAfterSegmentNacked; });
+ fetcher->afterSegmentTimedOut.connect([this] { ++nAfterSegmentTimedOut; });
}
void
@@ -141,7 +161,12 @@
uint32_t lastError = 0;
int nCompletions = 0;
size_t dataSize = 0;
- ConstBufferPtr dataBuf;
+ size_t nAfterSegmentReceived = 0;
+ size_t nAfterSegmentValidated = 0;
+ size_t nAfterSegmentNacked = 0;
+ size_t nAfterSegmentTimedOut = 0;
+ size_t nOnInOrderData = 0;
+ size_t nOnInOrderComplete = 0;
// number of segments in fetched object
uint64_t nSegments = 0;
@@ -200,17 +225,9 @@
BOOST_AUTO_TEST_CASE(BasicSingleSegment)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -231,17 +248,9 @@
SegmentFetcher::Options options;
options.useConstantCwnd = true;
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator, options);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -304,21 +313,13 @@
BOOST_AUTO_TEST_CASE(BasicMultipleSegments)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
sendNackInsteadOfDropping = false;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -331,25 +332,42 @@
BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
}
+BOOST_AUTO_TEST_CASE(BasicInOrder)
+{
+ DummyValidator acceptValidator;
+ SegmentFetcher::Options options;
+ options.inOrder = true;
+ nSegments = 401;
+ sendNackInsteadOfDropping = false;
+
+ auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+ connectSignals(fetcher);
+
+ face.processEvents(1_s);
+
+ BOOST_CHECK_EQUAL(nErrors, 0);
+ BOOST_CHECK_EQUAL(nCompletions, 1);
+ BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+ BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
BOOST_AUTO_TEST_CASE(FirstSegmentNotZero)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
sendNackInsteadOfDropping = false;
defaultSegmentToSend = 47;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -362,16 +380,38 @@
BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
}
+BOOST_AUTO_TEST_CASE(FirstSegmentNotZeroInOrder)
+{
+ DummyValidator acceptValidator;
+ SegmentFetcher::Options options;
+ options.inOrder = true;
+ nSegments = 401;
+ sendNackInsteadOfDropping = false;
+ defaultSegmentToSend = 47;
+
+ auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+ connectSignals(fetcher);
+
+ face.processEvents(1_s);
+
+ BOOST_CHECK_EQUAL(nErrors, 0);
+ BOOST_CHECK_EQUAL(nCompletions, 1);
+ BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+ BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
BOOST_AUTO_TEST_CASE(WindowSize)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
advanceClocks(10_ms); // T+10ms
@@ -540,17 +580,9 @@
BOOST_AUTO_TEST_CASE(MoreSegmentsThanNSegments)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -595,24 +627,16 @@
BOOST_AUTO_TEST_CASE(DuplicateNack)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
segmentsToDropOrNack.push(0);
segmentsToDropOrNack.push(200);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::DUPLICATE;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -628,24 +652,16 @@
BOOST_AUTO_TEST_CASE(CongestionNack)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
segmentsToDropOrNack.push(0);
segmentsToDropOrNack.push(200);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::CONGESTION;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -661,10 +677,6 @@
BOOST_AUTO_TEST_CASE(OtherNackReason)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
segmentsToDropOrNack.push(0);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::NO_ROUTE;
@@ -673,10 +685,6 @@
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -693,8 +701,8 @@
{
DummyValidator validator;
validator.getPolicy().setResultCallback([] (const Name& name) {
- return name.at(-1).toSegment() % 2 == 0;
- });
+ return name.at(-1).toSegment() % 2 == 0;
+ });
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
validator);
connectSignals(fetcher);
@@ -702,16 +710,6 @@
auto data1 = makeDataSegment("/hello/world", 0, false);
auto data2 = makeDataSegment("/hello/world", 1, true);
- size_t nRecvSegments = 0;
- fetcher->afterSegmentReceived.connect([&nRecvSegments] (const Data& receivedSegment) {
- ++nRecvSegments;
- });
-
- size_t nValidatedSegments = 0;
- fetcher->afterSegmentValidated.connect([&nValidatedSegments] (const Data& validatedSegment) {
- ++nValidatedSegments;
- });
-
advanceClocks(10_ms, 10);
BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 100_ms);
@@ -727,8 +725,8 @@
advanceClocks(10_ms, 10);
BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 200_ms);
- BOOST_CHECK_EQUAL(nRecvSegments, 2);
- BOOST_CHECK_EQUAL(nValidatedSegments, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 2);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 1);
BOOST_CHECK_EQUAL(nErrors, 1);
}
@@ -770,7 +768,7 @@
bool fetcherStopped = false;
fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
- fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data& data) {
+ fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data&) {
fetcherStopped = true;
fetcher->stop();
});
@@ -790,21 +788,12 @@
// BasicSingleSegment, but with scoped fetcher
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
weak_ptr<SegmentFetcher> weakFetcher;
{
auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
weakFetcher = fetcher;
connectSignals(fetcher);
-
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
}
advanceClocks(10_ms);
@@ -829,21 +818,12 @@
SegmentFetcher::Options options;
options.maxTimeout = 3000_ms;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
-
weak_ptr<SegmentFetcher> weakFetcher;
{
auto fetcher = SegmentFetcher::start(face, Interest("/localhost/nfd/faces/list"),
acceptValidator, options);
weakFetcher = fetcher;
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
}
advanceClocks(500_ms, 7);