chunks: modularize pipeline implementation
Change-Id: Iaa5acbb02a583858db6204716947cceeae793a65
Refs: #3636
diff --git a/AUTHORS.md b/AUTHORS.md
index 7647fe3..5dd0573 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -20,3 +20,4 @@
* Steve DiBenedetto <https://dibenede.github.io>
* Andrea Tosatto <https://linkedin.com/in/tosattoandrea>
* Vince Lehman <http://vslehman.com>
+* Weiwei Liu <https://www.linkedin.com/in/weiweiliu10>
diff --git a/tests/chunks/discover-version-fixed.t.cpp b/tests/chunks/discover-version-fixed.t.cpp
index a57e5c8..8f76153 100644
--- a/tests/chunks/discover-version-fixed.t.cpp
+++ b/tests/chunks/discover-version-fixed.t.cpp
@@ -31,8 +31,6 @@
namespace chunks {
namespace tests {
-using namespace ndn::tests;
-
class DiscoverVersionFixedFixture : public DiscoverVersionFixture
{
public:
diff --git a/tests/chunks/discover-version-fixture.hpp b/tests/chunks/discover-version-fixture.hpp
index 9009301..a7d8f0b 100644
--- a/tests/chunks/discover-version-fixture.hpp
+++ b/tests/chunks/discover-version-fixture.hpp
@@ -35,20 +35,28 @@
namespace chunks {
namespace tests {
-class DiscoverVersionFixture : public ndn::tests::UnitTestTimeFixture, virtual protected Options
+using namespace ndn::tests;
+
+class DiscoverVersionFixture : public UnitTestTimeFixture, virtual protected Options
{
public:
DiscoverVersionFixture(const Options& options)
: Options(options)
, face(io)
, name("/ndn/chunks/test")
- , isDiscoveryFinished(false)
, discoveredVersion(0)
+ , isDiscoveryFinished(false)
+ {
+ }
+
+ virtual
+ ~DiscoverVersionFixture()
{
}
protected:
- void setDiscover(unique_ptr<DiscoverVersion> disc)
+ void
+ setDiscover(unique_ptr<DiscoverVersion> disc)
{
discover = std::move(disc);
discover->onDiscoverySuccess.connect(bind(&DiscoverVersionFixture::onSuccess, this, _1));
@@ -56,11 +64,11 @@
}
shared_ptr<Data>
- makeDataWithVersion(uint64_t version)
+ makeDataWithVersion(uint64_t version) const
{
auto data = make_shared<Data>(Name(name).appendVersion(version).appendSegment(0));
data->setFinalBlockId(name::Component::fromSegment(0));
- return ndn::tests::signData(data);
+ return signData(data);
}
static Options
@@ -93,8 +101,8 @@
util::DummyClientFace face;
Name name;
unique_ptr<DiscoverVersion> discover;
- bool isDiscoveryFinished;
uint64_t discoveredVersion;
+ bool isDiscoveryFinished;
};
} // namespace tests
diff --git a/tests/chunks/discover-version-iterative.t.cpp b/tests/chunks/discover-version-iterative.t.cpp
index 233f36c..79c6fb7 100644
--- a/tests/chunks/discover-version-iterative.t.cpp
+++ b/tests/chunks/discover-version-iterative.t.cpp
@@ -31,8 +31,6 @@
namespace chunks {
namespace tests {
-using namespace ndn::tests;
-
class DiscoverVersionIterativeFixture : public DiscoverVersionFixture,
protected DiscoverVersionIterativeOptions
{
diff --git a/tests/chunks/pipeline-interests.t.cpp b/tests/chunks/pipeline-interests-fixed-window.t.cpp
similarity index 87%
rename from tests/chunks/pipeline-interests.t.cpp
rename to tests/chunks/pipeline-interests-fixed-window.t.cpp
index 98abfa4..c6291dd 100644
--- a/tests/chunks/pipeline-interests.t.cpp
+++ b/tests/chunks/pipeline-interests-fixed-window.t.cpp
@@ -23,66 +23,31 @@
* @author Andrea Tosatto
*/
-#include "tools/chunks/catchunks/pipeline-interests.hpp"
+#include "tools/chunks/catchunks/pipeline-interests-fixed-window.hpp"
#include "tools/chunks/catchunks/data-fetcher.hpp"
-#include "tests/test-common.hpp"
-#include <ndn-cxx/util/dummy-client-face.hpp>
+#include "pipeline-interests-fixture.hpp"
namespace ndn {
namespace chunks {
namespace tests {
-using namespace ndn::tests;
-
-class PipelineInterestsFixture : public UnitTestTimeFixture
+class PipelineInterestFixedWindowFixture : public PipelineInterestsFixture
{
public:
- typedef PipelineInterestsOptions Options;
+ typedef PipelineInterestsFixedWindowOptions Options;
public:
- PipelineInterestsFixture()
- : face(io)
+ PipelineInterestFixedWindowFixture()
+ : PipelineInterestsFixture()
, opt(makeOptions())
- , name("/ndn/chunks/test")
- , pipeline(face, opt)
- , nDataSegments(0)
- , nReceivedSegments(0)
- , hasFailed(false)
{
+ setPipeline(make_unique<PipelineInterestsFixedWindow>(face, PipelineInterestsFixedWindow::Options(opt)));
}
-
protected:
- shared_ptr<Data>
- makeDataWithSegment(uint64_t segmentNo, bool setFinalBlockId = true)
- {
- auto data = make_shared<Data>(Name(name).appendVersion(0).appendSegment(segmentNo));
- if (setFinalBlockId)
- data->setFinalBlockId(name::Component::fromSegment(nDataSegments - 1));
- return signData(data);
- }
-
- void
- runWithData(const Data& data)
- {
- pipeline.runWithExcludedSegment(data,
- bind(&PipelineInterestsFixture::onData, this, _1, _2),
- bind(&PipelineInterestsFixture::onFailure, this, _1));
- }
+ Options opt;
private:
- void
- onData(const Interest& interest, const Data& data)
- {
- nReceivedSegments++;
- }
-
- void
- onFailure(const std::string& reason)
- {
- hasFailed = true;
- }
-
static Options
makeOptions()
{
@@ -93,22 +58,12 @@
options.maxPipelineSize = 5;
return options;
}
-
-protected:
- boost::asio::io_service io;
- util::DummyClientFace face;
- Options opt;
- Name name;
- PipelineInterests pipeline;
- uint64_t nDataSegments;
- uint64_t nReceivedSegments;
- bool hasFailed;
};
BOOST_AUTO_TEST_SUITE(Chunks)
-BOOST_AUTO_TEST_SUITE(TestPipelineInterests)
+BOOST_AUTO_TEST_SUITE(TestPipelineInterestsFixedWindow)
-BOOST_FIXTURE_TEST_CASE(FewerSegmentsThanPipelineCapacity, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(FewerSegmentsThanPipelineCapacity, PipelineInterestFixedWindowFixture)
{
nDataSegments = 3;
BOOST_ASSERT(nDataSegments <= opt.maxPipelineSize);
@@ -138,7 +93,7 @@
BOOST_CHECK_EQUAL(hasFailed, true);
}
-BOOST_FIXTURE_TEST_CASE(FullPipeline, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(FullPipeline, PipelineInterestFixedWindowFixture)
{
nDataSegments = 13;
BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
@@ -171,7 +126,7 @@
BOOST_CHECK_EQUAL(hasFailed, false);
}
-BOOST_FIXTURE_TEST_CASE(TimeoutAllSegments, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(TimeoutAllSegments, PipelineInterestFixedWindowFixture)
{
nDataSegments = 13;
BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
@@ -196,7 +151,7 @@
BOOST_CHECK_EQUAL(hasFailed, true);
}
-BOOST_FIXTURE_TEST_CASE(TimeoutAfterFinalBlockIdReceived, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(TimeoutAfterFinalBlockIdReceived, PipelineInterestFixedWindowFixture)
{
// the FinalBlockId is sent with the first segment, after the first segment failure the pipeline
// should fail
@@ -241,7 +196,7 @@
BOOST_CHECK_EQUAL(face.getNPendingInterests(), 0);
}
-BOOST_FIXTURE_TEST_CASE(TimeoutBeforeFinalBlockIdReceived, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(TimeoutBeforeFinalBlockIdReceived, PipelineInterestFixedWindowFixture)
{
// the FinalBlockId is sent only with the last segment, all segments are sent except for the
// second one (segment #1); all segments are received correctly until the FinalBlockId is received
@@ -294,7 +249,7 @@
BOOST_CHECK_EQUAL(hasFailed, true);
}
-BOOST_FIXTURE_TEST_CASE(SegmentReceivedAfterTimeout, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(SegmentReceivedAfterTimeout, PipelineInterestFixedWindowFixture)
{
// the FinalBlockId is never sent, all the pipeline elements with a segment number greater than
// segment #0 will fail, after this failure also segment #0 fail and this should trigger an error
@@ -327,7 +282,7 @@
BOOST_CHECK_EQUAL(hasFailed, true);
}
-BOOST_FIXTURE_TEST_CASE(CongestionAllSegments, PipelineInterestsFixture)
+BOOST_FIXTURE_TEST_CASE(CongestionAllSegments, PipelineInterestFixedWindowFixture)
{
nDataSegments = 13;
BOOST_ASSERT(nDataSegments > opt.maxPipelineSize);
diff --git a/tests/chunks/pipeline-interests-fixture.hpp b/tests/chunks/pipeline-interests-fixture.hpp
new file mode 100644
index 0000000..8da4dc1
--- /dev/null
+++ b/tests/chunks/pipeline-interests-fixture.hpp
@@ -0,0 +1,104 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2016, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Andrea Tosatto
+ * @author Weiwei Liu
+ */
+
+#ifndef NDN_TOOLS_TESTS_CHUNKS_PIPELINE_INTERESTS_FIXTURE_HPP
+#define NDN_TOOLS_TESTS_CHUNKS_PIPELINE_INTERESTS_FIXTURE_HPP
+
+#include "tools/chunks/catchunks/pipeline-interests.hpp"
+
+#include "tests/test-common.hpp"
+#include <ndn-cxx/util/dummy-client-face.hpp>
+
+namespace ndn {
+namespace chunks {
+namespace tests {
+
+using namespace ndn::tests;
+
+class PipelineInterestsFixture : public UnitTestTimeFixture
+{
+public:
+ PipelineInterestsFixture()
+ : face(io)
+ , name("/ndn/chunks/test")
+ , nDataSegments(0)
+ , nReceivedSegments(0)
+ , hasFailed(false)
+ {
+ }
+
+protected:
+ void
+ setPipeline(unique_ptr<PipelineInterests> pline)
+ {
+ pipeline = std::move(pline);
+ }
+
+ shared_ptr<Data>
+ makeDataWithSegment(uint64_t segmentNo, bool setFinalBlockId = true) const
+ {
+ auto data = make_shared<Data>(Name(name).appendVersion(0).appendSegment(segmentNo));
+ if (setFinalBlockId)
+ data->setFinalBlockId(name::Component::fromSegment(nDataSegments - 1));
+ return signData(data);
+ }
+
+ void
+ runWithData(const Data& data)
+ {
+ pipeline->run(data,
+ bind(&PipelineInterestsFixture::onData, this, _1, _2),
+ bind(&PipelineInterestsFixture::onFailure, this, _1));
+ }
+
+private:
+ void
+ onData(const Interest& interest, const Data& data)
+ {
+ nReceivedSegments++;
+ }
+
+ void
+ onFailure(const std::string& reason)
+ {
+ hasFailed = true;
+ }
+
+protected:
+ boost::asio::io_service io;
+ util::DummyClientFace face;
+ Name name;
+ unique_ptr<PipelineInterests> pipeline;
+ uint64_t nDataSegments;
+ uint64_t nReceivedSegments;
+ bool hasFailed;
+};
+
+} // namespace tests
+} // namespace chunks
+} // namespace ndn
+
+#endif // NDN_TOOLS_TESTS_CHUNKS_PIPELINE_INTERESTS_FIXTURE_HPP
diff --git a/tools/chunks/catchunks/consumer.cpp b/tools/chunks/catchunks/consumer.cpp
index 8976f6a..fe30f75 100644
--- a/tools/chunks/catchunks/consumer.cpp
+++ b/tools/chunks/catchunks/consumer.cpp
@@ -26,7 +26,6 @@
*/
#include "consumer.hpp"
-#include "discover-version.hpp"
namespace ndn {
namespace chunks {
@@ -34,35 +33,37 @@
Consumer::Consumer(Face& face, Validator& validator, bool isVerbose, std::ostream& os)
: m_face(face)
, m_validator(validator)
- , m_pipeline(nullptr)
- , m_nextToPrint(0)
, m_outputStream(os)
+ , m_nextToPrint(0)
, m_isVerbose(isVerbose)
{
}
-void Consumer::run(DiscoverVersion& discover, PipelineInterests& pipeline)
+void
+Consumer::run(unique_ptr<DiscoverVersion> discover, unique_ptr<PipelineInterests> pipeline)
{
- m_pipeline = &pipeline;
+ m_discover = std::move(discover);
+ m_pipeline = std::move(pipeline);
m_nextToPrint = 0;
+ m_bufferedData.clear();
- discover.onDiscoverySuccess.connect(bind(&Consumer::runWithData, this, _1));
- discover.onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+ discover->onDiscoverySuccess.connect(bind(&Consumer::startPipeline, this, _1));
+ discover->onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+ discover->run();
- discover.run();
m_face.processEvents();
}
-void Consumer::runWithData(const Data& data)
+void
+Consumer::startPipeline(const Data& data)
{
m_validator.validate(data,
bind(&Consumer::onDataValidated, this, _1),
bind(&Consumer::onFailure, this, _2));
- m_pipeline->runWithExcludedSegment(data,
- bind(&Consumer::onData, this, _1, _2),
- bind(&Consumer::onFailure, this, _1));
-
+ m_pipeline->run(data,
+ bind(&Consumer::onData, this, _1, _2),
+ bind(&Consumer::onFailure, this, _1));
}
void
@@ -100,7 +101,6 @@
for (auto it = m_bufferedData.begin();
it != m_bufferedData.end() && it->first == m_nextToPrint;
it = m_bufferedData.erase(it), ++m_nextToPrint) {
-
const Block& content = it->second->getContent();
m_outputStream.write(reinterpret_cast<const char*>(content.value()), content.value_size());
}
diff --git a/tools/chunks/catchunks/consumer.hpp b/tools/chunks/catchunks/consumer.hpp
index 9428a06..e2451be 100644
--- a/tools/chunks/catchunks/consumer.hpp
+++ b/tools/chunks/catchunks/consumer.hpp
@@ -25,12 +25,11 @@
* @author Andrea Tosatto
*/
-
#ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_CONSUMER_HPP
#define NDN_TOOLS_CHUNKS_CATCHUNKS_CONSUMER_HPP
-#include "pipeline-interests.hpp"
#include "discover-version.hpp"
+#include "pipeline-interests.hpp"
#include <ndn-cxx/security/validator.hpp>
@@ -66,11 +65,11 @@
* @brief Run the consumer
*/
void
- run(DiscoverVersion& discover, PipelineInterests& pipeline);
+ run(unique_ptr<DiscoverVersion> discover, unique_ptr<PipelineInterests> pipeline);
private:
void
- runWithData(const Data& data);
+ startPipeline(const Data& data);
void
onData(const Interest& interest, const Data& data);
@@ -88,9 +87,10 @@
private:
Face& m_face;
Validator& m_validator;
- PipelineInterests* m_pipeline;
- uint64_t m_nextToPrint;
std::ostream& m_outputStream;
+ unique_ptr<DiscoverVersion> m_discover;
+ unique_ptr<PipelineInterests> m_pipeline;
+ uint64_t m_nextToPrint;
bool m_isVerbose;
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
diff --git a/tools/chunks/catchunks/ndncatchunks.cpp b/tools/chunks/catchunks/ndncatchunks.cpp
index d2b679f..98839b3 100644
--- a/tools/chunks/catchunks/ndncatchunks.cpp
+++ b/tools/chunks/catchunks/ndncatchunks.cpp
@@ -23,6 +23,8 @@
* @author Wentao Shang
* @author Steve DiBenedetto
* @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
*/
#include "core/version.hpp"
@@ -30,6 +32,7 @@
#include "consumer.hpp"
#include "discover-version-fixed.hpp"
#include "discover-version-iterative.hpp"
+#include "pipeline-interests-fixed-window.hpp"
#include <ndn-cxx/security/validator-null.hpp>
@@ -42,6 +45,7 @@
std::string programName(argv[0]);
Options options;
std::string discoverType("fixed");
+ std::string pipelineType("fixed");
size_t maxPipelineSize(1);
int maxRetriesAfterVersionFound(1);
std::string uri;
@@ -148,15 +152,23 @@
return 2;
}
+ unique_ptr<PipelineInterests> pipeline;
+ if (pipelineType == "fixed") {
+ PipelineInterestsFixedWindow::Options optionsPipeline(options);
+ optionsPipeline.maxPipelineSize = maxPipelineSize;
+ pipeline = make_unique<PipelineInterestsFixedWindow>(face, optionsPipeline);
+ }
+ else {
+ std::cerr << "ERROR: Interest pipeline type not valid" << std::endl;
+ return 2;
+ }
+
ValidatorNull validator;
Consumer consumer(face, validator, options.isVerbose);
- PipelineInterests::Options optionsPipeline(options);
- optionsPipeline.maxPipelineSize = maxPipelineSize;
- PipelineInterests pipeline(face, optionsPipeline);
-
BOOST_ASSERT(discover != nullptr);
- consumer.run(*discover, pipeline);
+ BOOST_ASSERT(pipeline != nullptr);
+ consumer.run(std::move(discover), std::move(pipeline));
}
catch (const Consumer::ApplicationNackError& e) {
std::cerr << "ERROR: " << e.what() << std::endl;
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
new file mode 100644
index 0000000..049d52d
--- /dev/null
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
@@ -0,0 +1,183 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2016, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Wentao Shang
+ * @author Steve DiBenedetto
+ * @author Andrea Tosatto
+ * @author Davide Pesavento
+ */
+
+#include "pipeline-interests-fixed-window.hpp"
+#include "data-fetcher.hpp"
+
+namespace ndn {
+namespace chunks {
+
+PipelineInterestsFixedWindow::PipelineInterestsFixedWindow(Face& face, const Options& options)
+ : PipelineInterests(face)
+ , m_options(options)
+ , m_nextSegmentNo(0)
+ , m_hasFailure(false)
+{
+ m_segmentFetchers.resize(m_options.maxPipelineSize);
+}
+
+PipelineInterestsFixedWindow::~PipelineInterestsFixedWindow()
+{
+ cancel();
+}
+
+void
+PipelineInterestsFixedWindow::doRun()
+{
+ // if the FinalBlockId is unknown, this could potentially request non-existent segments
+ for (size_t nRequestedSegments = 0;
+ nRequestedSegments < m_options.maxPipelineSize;
+ ++nRequestedSegments) {
+ if (!fetchNextSegment(nRequestedSegments))
+ // all segments have been requested
+ break;
+ }
+}
+
+bool
+PipelineInterestsFixedWindow::fetchNextSegment(std::size_t pipeNo)
+{
+ if (isStopping())
+ return false;
+
+ if (m_hasFailure) {
+ onFailure("Fetching terminated but no final segment number has been found");
+ return false;
+ }
+
+ if (m_nextSegmentNo == m_excludedSegmentNo)
+ m_nextSegmentNo++;
+
+ if (m_hasFinalBlockId && m_nextSegmentNo > m_lastSegmentNo)
+ return false;
+
+ // send interest for next segment
+ if (m_options.isVerbose)
+ std::cerr << "Requesting segment #" << m_nextSegmentNo << std::endl;
+
+ Interest interest(Name(m_prefix).appendSegment(m_nextSegmentNo));
+ interest.setInterestLifetime(m_options.interestLifetime);
+ interest.setMustBeFresh(m_options.mustBeFresh);
+ interest.setMaxSuffixComponents(1);
+
+ auto fetcher = DataFetcher::fetch(m_face, interest,
+ m_options.maxRetriesOnTimeoutOrNack,
+ m_options.maxRetriesOnTimeoutOrNack,
+ bind(&PipelineInterestsFixedWindow::handleData, this, _1, _2, pipeNo),
+ bind(&PipelineInterestsFixedWindow::handleFail, this, _2, pipeNo),
+ bind(&PipelineInterestsFixedWindow::handleFail, this, _2, pipeNo),
+ m_options.isVerbose);
+
+ BOOST_ASSERT(!m_segmentFetchers[pipeNo].first || !m_segmentFetchers[pipeNo].first->isRunning());
+ m_segmentFetchers[pipeNo] = make_pair(fetcher, m_nextSegmentNo);
+ m_nextSegmentNo++;
+
+ return true;
+}
+
+void
+PipelineInterestsFixedWindow::doCancel()
+{
+ for (auto& fetcher : m_segmentFetchers) {
+ if (fetcher.first)
+ fetcher.first->cancel();
+ }
+
+ m_segmentFetchers.clear();
+}
+
+void
+PipelineInterestsFixedWindow::handleData(const Interest& interest, const Data& data, size_t pipeNo)
+{
+ if (isStopping())
+ return;
+
+ BOOST_ASSERT(data.getName().equals(interest.getName()));
+
+ if (m_options.isVerbose)
+ std::cerr << "Received segment #" << data.getName()[-1].toSegment() << std::endl;
+
+ onData(interest, data);
+
+ if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
+ m_lastSegmentNo = data.getFinalBlockId().toSegment();
+ m_hasFinalBlockId = true;
+
+ for (auto& fetcher : m_segmentFetchers) {
+ if (fetcher.first == nullptr)
+ continue;
+
+ if (fetcher.second > m_lastSegmentNo) {
+ // stop trying to fetch segments that are beyond m_lastSegmentNo
+ fetcher.first->cancel();
+ }
+ else if (fetcher.first->hasError()) { // fetcher.second <= m_lastSegmentNo
+ // there was an error while fetching a segment that is part of the content
+ return onFailure("Failure retrieving segment #" + to_string(fetcher.second));
+ }
+ }
+ }
+
+ fetchNextSegment(pipeNo);
+}
+
+void PipelineInterestsFixedWindow::handleFail(const std::string& reason, std::size_t pipeNo)
+{
+ if (isStopping())
+ return;
+
+ // if the failed segment is definitely part of the content, raise a fatal error
+ if (m_hasFinalBlockId && m_segmentFetchers[pipeNo].second <= m_lastSegmentNo)
+ return onFailure(reason);
+
+ if (!m_hasFinalBlockId) {
+ bool areAllFetchersStopped = true;
+ for (auto& fetcher : m_segmentFetchers) {
+ if (fetcher.first == nullptr)
+ continue;
+
+ // cancel fetching all segments that follow
+ if (fetcher.second > m_segmentFetchers[pipeNo].second) {
+ fetcher.first->cancel();
+ }
+ else if (fetcher.first->isRunning()) { // fetcher.second <= m_segmentFetchers[pipeNo].second
+ areAllFetchersStopped = false;
+ }
+ }
+
+ if (areAllFetchersStopped) {
+ onFailure("Fetching terminated but no final segment number has been found");
+ }
+ else {
+ m_hasFailure = true;
+ }
+ }
+}
+
+} // namespace chunks
+} // namespace ndn
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp
new file mode 100644
index 0000000..2da40e6
--- /dev/null
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp
@@ -0,0 +1,118 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2016, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Wentao Shang
+ * @author Steve DiBenedetto
+ * @author Andrea Tosatto
+ * @author Davide Pesavento
+ */
+
+#include "options.hpp"
+#include "pipeline-interests.hpp"
+
+namespace ndn {
+namespace chunks {
+
+class DataFetcher;
+
+class PipelineInterestsFixedWindowOptions : public Options
+{
+public:
+ explicit
+ PipelineInterestsFixedWindowOptions(const Options& options = Options())
+ : Options(options)
+ , maxPipelineSize(1)
+ {
+ }
+
+public:
+ size_t maxPipelineSize;
+};
+
+/**
+ * @brief Service for retrieving Data via an Interest pipeline
+ *
+ * Retrieves all segments of Data under a given prefix by maintaining a fixed-size window of
+ * N Interests in flight. A user-specified callback function is used to notify the arrival of
+ * each segment of Data.
+ *
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
+ */
+class PipelineInterestsFixedWindow : public PipelineInterests
+{
+public:
+ typedef PipelineInterestsFixedWindowOptions Options;
+
+public:
+ /**
+ * @brief create a PipelineInterestsFixedWindow service
+ *
+ * Configures the pipelining service without specifying the retrieval namespace. After this
+ * configuration the method run must be called to start the Pipeline.
+ */
+ explicit
+ PipelineInterestsFixedWindow(Face& face, const Options& options = Options());
+
+ ~PipelineInterestsFixedWindow() final;
+
+private:
+ /**
+ * @brief fetch all the segments between 0 and m_lastSegmentNo
+ *
+ * Starts a fixed-window pipeline with size equal to m_options.maxPipelineSize. The pipeline
+ * will fetch every segment until the last segment is successfully received or an error occurs.
+ * The segment with segment number equal to m_excludedSegmentNo will not be fetched.
+ */
+ void
+ doRun() final;
+
+ void
+ doCancel() final;
+
+ /**
+ * @brief fetch the next segment that has not been requested yet
+ *
+ * @return false if there is an error or all the segments have been fetched, true otherwise
+ */
+ bool
+ fetchNextSegment(size_t pipeNo);
+
+ void
+ handleData(const Interest& interest, const Data& data, size_t pipeNo);
+
+ void
+ handleFail(const std::string& reason, size_t pipeNo);
+
+private:
+ const Options m_options;
+ std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
+ uint64_t m_nextSegmentNo;
+ /**
+ * true if one or more segment fetchers encountered an error; if m_hasFinalBlockId
+ * is false, this is usually not a fatal error for the pipeline
+ */
+ bool m_hasFailure;
+};
+
+} // namespace chunks
+} // namespace ndn
diff --git a/tools/chunks/catchunks/pipeline-interests.cpp b/tools/chunks/catchunks/pipeline-interests.cpp
index b07c959..ff817e4 100644
--- a/tools/chunks/catchunks/pipeline-interests.cpp
+++ b/tools/chunks/catchunks/pipeline-interests.cpp
@@ -23,179 +23,63 @@
* @author Wentao Shang
* @author Steve DiBenedetto
* @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
*/
#include "pipeline-interests.hpp"
-#include "data-fetcher.hpp"
namespace ndn {
namespace chunks {
-PipelineInterests::PipelineInterests(Face& face, const Options& options)
+PipelineInterests::PipelineInterests(Face& face)
: m_face(face)
- , m_nextSegmentNo(0)
, m_lastSegmentNo(0)
- , m_excludeSegmentNo(0)
- , m_options(options)
+ , m_excludedSegmentNo(0)
, m_hasFinalBlockId(false)
- , m_hasError(false)
- , m_hasFailure(false)
+ , m_isStopping(false)
{
- m_segmentFetchers.resize(m_options.maxPipelineSize);
}
-PipelineInterests::~PipelineInterests()
-{
- cancel();
-}
+PipelineInterests::~PipelineInterests() = default;
void
-PipelineInterests::runWithExcludedSegment(const Data& data, DataCallback onData,
- FailureCallback onFailure)
+PipelineInterests::run(const Data& data, DataCallback onData, FailureCallback onFailure)
{
BOOST_ASSERT(onData != nullptr);
m_onData = std::move(onData);
m_onFailure = std::move(onFailure);
-
- Name dataName = data.getName();
- m_prefix = dataName.getPrefix(-1);
- m_excludeSegmentNo = dataName[-1].toSegment();
+ m_prefix = data.getName().getPrefix(-1);
+ m_excludedSegmentNo = data.getName()[-1].toSegment();
if (!data.getFinalBlockId().empty()) {
- m_hasFinalBlockId = true;
m_lastSegmentNo = data.getFinalBlockId().toSegment();
+ m_hasFinalBlockId = true;
}
- // if the FinalBlockId is unknown, this could potentially request non-existent segments
- for (size_t nRequestedSegments = 0; nRequestedSegments < m_options.maxPipelineSize;
- nRequestedSegments++) {
- if (!fetchNextSegment(nRequestedSegments)) // all segments have been requested
- break;
- }
-}
-
-bool
-PipelineInterests::fetchNextSegment(std::size_t pipeNo)
-{
- if (m_hasFailure) {
- fail("Fetching terminated but no final segment number has been found");
- return false;
- }
-
- if (m_nextSegmentNo == m_excludeSegmentNo)
- m_nextSegmentNo++;
-
- if (m_hasFinalBlockId && m_nextSegmentNo > m_lastSegmentNo)
- return false;
-
- // Send interest for next segment
- if (m_options.isVerbose)
- std::cerr << "Requesting segment #" << m_nextSegmentNo << std::endl;
-
- Interest interest(Name(m_prefix).appendSegment(m_nextSegmentNo));
- interest.setInterestLifetime(m_options.interestLifetime);
- interest.setMustBeFresh(m_options.mustBeFresh);
- interest.setMaxSuffixComponents(1);
-
- BOOST_ASSERT(!m_segmentFetchers[pipeNo].first || !m_segmentFetchers[pipeNo].first->isRunning());
-
- auto fetcher = DataFetcher::fetch(m_face, interest,
- m_options.maxRetriesOnTimeoutOrNack,
- m_options.maxRetriesOnTimeoutOrNack,
- bind(&PipelineInterests::handleData, this, _1, _2, pipeNo),
- bind(&PipelineInterests::handleFail, this, _2, pipeNo),
- bind(&PipelineInterests::handleFail, this, _2, pipeNo),
- m_options.isVerbose);
-
- m_segmentFetchers[pipeNo] = make_pair(fetcher, m_nextSegmentNo);
-
- m_nextSegmentNo++;
- return true;
+ doRun();
}
void
PipelineInterests::cancel()
{
- for (auto& fetcher : m_segmentFetchers)
- if (fetcher.first)
- fetcher.first->cancel();
+ if (m_isStopping)
+ return;
- m_segmentFetchers.clear();
+ m_isStopping = true;
+ doCancel();
}
void
-PipelineInterests::fail(const std::string& reason)
+PipelineInterests::onFailure(const std::string& reason)
{
- if (!m_hasError) {
- cancel();
- m_hasError = true;
- m_hasFailure = true;
- if (m_onFailure)
- m_face.getIoService().post([this, reason] { m_onFailure(reason); });
- }
-}
-
-void
-PipelineInterests::handleData(const Interest& interest, const Data& data, size_t pipeNo)
-{
- if (m_hasError)
+ if (m_isStopping)
return;
- BOOST_ASSERT(data.getName().equals(interest.getName()));
+ cancel();
- if (m_options.isVerbose)
- std::cerr << "Received segment #" << data.getName()[-1].toSegment() << std::endl;
-
- m_onData(interest, data);
-
- if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
- m_lastSegmentNo = data.getFinalBlockId().toSegment();
- m_hasFinalBlockId = true;
-
- for (auto& fetcher : m_segmentFetchers) {
- if (fetcher.first && fetcher.second > m_lastSegmentNo) {
- // Stop trying to fetch segments that are not part of the content
- fetcher.first->cancel();
- }
- else if (fetcher.first && fetcher.first->hasError()) { // fetcher.second <= m_lastSegmentNo
- // there was an error while fetching a segment that is part of the content
- fail("Failure retriving segment #" + to_string(fetcher.second));
- return;
- }
- }
- }
-
- fetchNextSegment(pipeNo);
-}
-
-void PipelineInterests::handleFail(const std::string& reason, std::size_t pipeNo)
-{
- if (m_hasError)
- return;
-
- if (m_hasFinalBlockId && m_segmentFetchers[pipeNo].second <= m_lastSegmentNo) {
- fail(reason);
- }
- else if (!m_hasFinalBlockId) {
- // don't fetch the following segments
- bool areAllFetchersStopped = true;
- for (auto& fetcher : m_segmentFetchers) {
- if (fetcher.first && fetcher.second > m_segmentFetchers[pipeNo].second) {
- fetcher.first->cancel();
- }
- else if (fetcher.first && fetcher.first->isRunning()) {
- // fetcher.second <= m_segmentFetchers[pipeNo].second
- areAllFetchersStopped = false;
- }
- }
- if (areAllFetchersStopped) {
- if (m_onFailure)
- fail("Fetching terminated but no final segment number has been found");
- }
- else {
- m_hasFailure = true;
- }
- }
+ if (m_onFailure)
+ m_face.getIoService().post([this, reason] { m_onFailure(reason); });
}
} // namespace chunks
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index 32d5b79..63d714d 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -23,46 +23,31 @@
* @author Wentao Shang
* @author Steve DiBenedetto
* @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
*/
#ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
#define NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
#include "core/common.hpp"
-#include "options.hpp"
namespace ndn {
namespace chunks {
-class DataFetcher;
-
-class PipelineInterestsOptions : public Options
-{
-public:
- explicit
- PipelineInterestsOptions(const Options& options = Options())
- : Options(options)
- , maxPipelineSize(1)
- {
- }
-
-public:
- size_t maxPipelineSize;
-};
-
/**
* @brief Service for retrieving Data via an Interest pipeline
*
- * Retrieves all segmented Data under the specified prefix by maintaining a pipeline of N Interests
- * in flight.
+ * Retrieves all segments of Data under a given prefix by maintaining a (variable or fixed-size)
+ * window of N Interests in flight. A user-specified callback function is used to notify
+ * the arrival of each segment of Data.
*
- * Provides retrieved Data on arrival with no ordering guarantees. Data is delivered to the
- * PipelineInterests' user via callback immediately upon arrival.
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
*/
class PipelineInterests
{
public:
- typedef PipelineInterestsOptions Options;
typedef function<void(const std::string& reason)> FailureCallback;
public:
@@ -70,26 +55,23 @@
* @brief create a PipelineInterests service
*
* Configures the pipelining service without specifying the retrieval namespace. After this
- * configuration the method runWithExcludedSegment must be called to run the Pipeline.
+ * configuration the method run must be called to start the Pipeline.
*/
explicit
- PipelineInterests(Face& face, const Options& options = Options());
+ PipelineInterests(Face& face);
+ virtual
~PipelineInterests();
/**
- * @brief fetch all the segments between 0 and lastSegment of the specified prefix
+ * @brief start fetching all the segments of the specified prefix
*
- * Starts the pipeline of size defined inside the options. The pipeline retrieves all the segments
- * until the last segment is received, @p data is excluded from the retrieving.
- *
- * @param data a segment of the segmented Data to retrive; data.getName() must end with a segment
- * number
+ * @param data a segment of the segmented Data to fetch; the Data name must end with a segment number
* @param onData callback for every segment correctly received, must not be empty
- * @param onfailure callback called if an error occurs
+ * @param onFailure callback if an error occurs, may be empty
*/
void
- runWithExcludedSegment(const Data& data, DataCallback onData, FailureCallback onFailure);
+ run(const Data& data, DataCallback onData, FailureCallback onFailure);
/**
* @brief stop all fetch operations
@@ -97,44 +79,52 @@
void
cancel();
+protected:
+ bool
+ isStopping() const
+ {
+ return m_isStopping;
+ }
+
+ void
+ onData(const Interest& interest, const Data& data) const
+ {
+ m_onData(interest, data);
+ }
+
+ /**
+ * @brief subclasses can call this method to signal an unrecoverable failure
+ */
+ void
+ onFailure(const std::string& reason);
+
private:
/**
- * @brief fetch the next segment that has not been requested yet
+ * @brief perform subclass-specific operations to fetch all the segments
*
- * @return false if there is an error or all the segments have been fetched, true otherwise
+ * When overriding this function, at a minimum, the subclass should implement the retrieving
+ * of all the segments. Segment m_excludedSegmentNo can be skipped. Subclass must guarantee
+ * that onData is called at least once for every segment that is fetched successfully.
+ *
+ * @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
*/
- bool
- fetchNextSegment(size_t pipeNo);
+ virtual void
+ doRun() = 0;
- void
- fail(const std::string& reason);
+ virtual void
+ doCancel() = 0;
- void
- handleData(const Interest& interest, const Data& data, size_t pipeNo);
-
- void
- handleFail(const std::string& reason, size_t pipeNo);
+protected:
+ Face& m_face;
+ Name m_prefix;
+ uint64_t m_lastSegmentNo;
+ uint64_t m_excludedSegmentNo;
+ bool m_hasFinalBlockId; ///< true if the last segment number is known
private:
- Name m_prefix;
- Face& m_face;
- uint64_t m_nextSegmentNo;
- uint64_t m_lastSegmentNo;
- uint64_t m_excludeSegmentNo;
DataCallback m_onData;
FailureCallback m_onFailure;
- const Options m_options;
- std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
- bool m_hasFinalBlockId;
- /**
- * true if there's a critical error
- */
- bool m_hasError;
- /**
- * true if one or more segmentFetcher failed, if lastSegmentNo is not set this is usually not a
- * fatal error for the pipeline
- */
- bool m_hasFailure;
+ bool m_isStopping;
};
} // namespace chunks