util: add "in order" mode to SegmentFetcher
This commit adds an option for the fetcher to emit a signal when a segment
has arrived in order. Out-of-order segments are maintained internally by
the fetcher and are deleted when the signal is emitted.
Change-Id: I84aec680774af3e3c07bb07c881eab8908b5c9c2
diff --git a/tests/unit/util/segment-fetcher.t.cpp b/tests/unit/util/segment-fetcher.t.cpp
index 9f65f49..7f63db8 100644
--- a/tests/unit/util/segment-fetcher.t.cpp
+++ b/tests/unit/util/segment-fetcher.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2013-2019 Regents of the University of California.
+ * Copyright (c) 2013-2020 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -73,7 +73,20 @@
{
++nCompletions;
dataSize = data->size();
- dataBuf = data;
+ }
+
+ void
+ onInOrderComplete()
+ {
+ ++nCompletions;
+ ++nOnInOrderComplete;
+ }
+
+ void
+ onInOrderData(ConstBufferPtr data)
+ {
+ ++nOnInOrderData;
+ dataSize += data->size();
}
void
@@ -88,8 +101,15 @@
void
connectSignals(const shared_ptr<SegmentFetcher>& fetcher)
{
+ fetcher->onInOrderData.connect(bind(&Fixture::onInOrderData, this, _1));
+ fetcher->onInOrderComplete.connect(bind(&Fixture::onInOrderComplete, this));
fetcher->onComplete.connect(bind(&Fixture::onComplete, this, _1));
fetcher->onError.connect(bind(&Fixture::onError, this, _1));
+
+ fetcher->afterSegmentReceived.connect([this] (const auto&) { ++this->nAfterSegmentReceived; });
+ fetcher->afterSegmentValidated.connect([this] (const auto &) { ++this->nAfterSegmentValidated; });
+ fetcher->afterSegmentNacked.connect([this] { ++nAfterSegmentNacked; });
+ fetcher->afterSegmentTimedOut.connect([this] { ++nAfterSegmentTimedOut; });
}
void
@@ -141,7 +161,12 @@
uint32_t lastError = 0;
int nCompletions = 0;
size_t dataSize = 0;
- ConstBufferPtr dataBuf;
+ size_t nAfterSegmentReceived = 0;
+ size_t nAfterSegmentValidated = 0;
+ size_t nAfterSegmentNacked = 0;
+ size_t nAfterSegmentTimedOut = 0;
+ size_t nOnInOrderData = 0;
+ size_t nOnInOrderComplete = 0;
// number of segments in fetched object
uint64_t nSegments = 0;
@@ -200,17 +225,9 @@
BOOST_AUTO_TEST_CASE(BasicSingleSegment)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -231,17 +248,9 @@
SegmentFetcher::Options options;
options.useConstantCwnd = true;
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator, options);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -304,21 +313,13 @@
BOOST_AUTO_TEST_CASE(BasicMultipleSegments)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
sendNackInsteadOfDropping = false;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -331,25 +332,42 @@
BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
}
+BOOST_AUTO_TEST_CASE(BasicInOrder)
+{
+ DummyValidator acceptValidator;
+ SegmentFetcher::Options options;
+ options.inOrder = true;
+ nSegments = 401;
+ sendNackInsteadOfDropping = false;
+
+ auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+ connectSignals(fetcher);
+
+ face.processEvents(1_s);
+
+ BOOST_CHECK_EQUAL(nErrors, 0);
+ BOOST_CHECK_EQUAL(nCompletions, 1);
+ BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+ BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
BOOST_AUTO_TEST_CASE(FirstSegmentNotZero)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
sendNackInsteadOfDropping = false;
defaultSegmentToSend = 47;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -362,16 +380,38 @@
BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
}
+BOOST_AUTO_TEST_CASE(FirstSegmentNotZeroInOrder)
+{
+ DummyValidator acceptValidator;
+ SegmentFetcher::Options options;
+ options.inOrder = true;
+ nSegments = 401;
+ sendNackInsteadOfDropping = false;
+ defaultSegmentToSend = 47;
+
+ auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator, options);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
+ connectSignals(fetcher);
+
+ face.processEvents(1_s);
+
+ BOOST_CHECK_EQUAL(nErrors, 0);
+ BOOST_CHECK_EQUAL(nCompletions, 1);
+ BOOST_CHECK_EQUAL(dataSize, 14 * 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 401);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderData, 401);
+ BOOST_CHECK_EQUAL(nOnInOrderComplete, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentNacked, 0);
+ BOOST_CHECK_EQUAL(nAfterSegmentTimedOut, 0);
+}
+
BOOST_AUTO_TEST_CASE(WindowSize)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
advanceClocks(10_ms); // T+10ms
@@ -540,17 +580,9 @@
BOOST_AUTO_TEST_CASE(MoreSegmentsThanNSegments)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
advanceClocks(10_ms);
@@ -595,24 +627,16 @@
BOOST_AUTO_TEST_CASE(DuplicateNack)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
segmentsToDropOrNack.push(0);
segmentsToDropOrNack.push(200);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::DUPLICATE;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -628,24 +652,16 @@
BOOST_AUTO_TEST_CASE(CongestionNack)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
nSegments = 401;
segmentsToDropOrNack.push(0);
segmentsToDropOrNack.push(200);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::CONGESTION;
- face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
+ face.onSendInterest.connect(bind(&Fixture::onInterest, this, _1));
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -661,10 +677,6 @@
BOOST_AUTO_TEST_CASE(OtherNackReason)
{
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
segmentsToDropOrNack.push(0);
sendNackInsteadOfDropping = true;
nackReason = lp::NackReason::NO_ROUTE;
@@ -673,10 +685,6 @@
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
acceptValidator);
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
face.processEvents(1_s);
@@ -693,8 +701,8 @@
{
DummyValidator validator;
validator.getPolicy().setResultCallback([] (const Name& name) {
- return name.at(-1).toSegment() % 2 == 0;
- });
+ return name.at(-1).toSegment() % 2 == 0;
+ });
shared_ptr<SegmentFetcher> fetcher = SegmentFetcher::start(face, Interest("/hello/world"),
validator);
connectSignals(fetcher);
@@ -702,16 +710,6 @@
auto data1 = makeDataSegment("/hello/world", 0, false);
auto data2 = makeDataSegment("/hello/world", 1, true);
- size_t nRecvSegments = 0;
- fetcher->afterSegmentReceived.connect([&nRecvSegments] (const Data& receivedSegment) {
- ++nRecvSegments;
- });
-
- size_t nValidatedSegments = 0;
- fetcher->afterSegmentValidated.connect([&nValidatedSegments] (const Data& validatedSegment) {
- ++nValidatedSegments;
- });
-
advanceClocks(10_ms, 10);
BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 100_ms);
@@ -727,8 +725,8 @@
advanceClocks(10_ms, 10);
BOOST_CHECK_EQUAL(fetcher->m_timeLastSegmentReceived, time::steady_clock::now() - 200_ms);
- BOOST_CHECK_EQUAL(nRecvSegments, 2);
- BOOST_CHECK_EQUAL(nValidatedSegments, 1);
+ BOOST_CHECK_EQUAL(nAfterSegmentReceived, 2);
+ BOOST_CHECK_EQUAL(nAfterSegmentValidated, 1);
BOOST_CHECK_EQUAL(nErrors, 1);
}
@@ -770,7 +768,7 @@
bool fetcherStopped = false;
fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
- fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data& data) {
+ fetcher->afterSegmentReceived.connect([&fetcher, &fetcherStopped] (const Data&) {
fetcherStopped = true;
fetcher->stop();
});
@@ -790,21 +788,12 @@
// BasicSingleSegment, but with scoped fetcher
DummyValidator acceptValidator;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
weak_ptr<SegmentFetcher> weakFetcher;
{
auto fetcher = SegmentFetcher::start(face, Interest("/hello/world"), acceptValidator);
weakFetcher = fetcher;
connectSignals(fetcher);
-
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
}
advanceClocks(10_ms);
@@ -829,21 +818,12 @@
SegmentFetcher::Options options;
options.maxTimeout = 3000_ms;
- size_t nAfterSegmentReceived = 0;
- size_t nAfterSegmentValidated = 0;
- size_t nAfterSegmentNacked = 0;
- size_t nAfterSegmentTimedOut = 0;
-
weak_ptr<SegmentFetcher> weakFetcher;
{
auto fetcher = SegmentFetcher::start(face, Interest("/localhost/nfd/faces/list"),
acceptValidator, options);
weakFetcher = fetcher;
connectSignals(fetcher);
- fetcher->afterSegmentReceived.connect(bind([&nAfterSegmentReceived] { ++nAfterSegmentReceived; }));
- fetcher->afterSegmentValidated.connect(bind([&nAfterSegmentValidated] { ++nAfterSegmentValidated; }));
- fetcher->afterSegmentNacked.connect(bind([&nAfterSegmentNacked] { ++nAfterSegmentNacked; }));
- fetcher->afterSegmentTimedOut.connect(bind([&nAfterSegmentTimedOut] { ++nAfterSegmentTimedOut; }));
}
advanceClocks(500_ms, 7);