chunks: consolidate accounting of received data into onData()
Change-Id: Ib47381d3f69b482d60af8b07d002135944b17604
diff --git a/tools/chunks/catchunks/consumer.cpp b/tools/chunks/catchunks/consumer.cpp
index 6132d82..ce01583 100644
--- a/tools/chunks/catchunks/consumer.cpp
+++ b/tools/chunks/catchunks/consumer.cpp
@@ -46,7 +46,11 @@
m_nextToPrint = 0;
m_bufferedData.clear();
- m_discover->onDiscoverySuccess.connect(bind(&Consumer::startPipeline, this, _1));
+ m_discover->onDiscoverySuccess.connect([this] (const Data& data) {
+ m_pipeline->run(data,
+ [this] (const Data& data) { handleData(data); },
+ [] (const std::string& msg) { BOOST_THROW_EXCEPTION(std::runtime_error(msg)); });
+ });
m_discover->onDiscoveryFailure.connect([] (const std::string& msg) {
BOOST_THROW_EXCEPTION(std::runtime_error(msg));
});
@@ -54,16 +58,6 @@
}
void
-Consumer::startPipeline(const Data& data)
-{
- this->handleData(data);
-
- m_pipeline->run(data,
- [this] (const Interest&, const Data& data) { this->handleData(data); },
- [] (const std::string& msg) { BOOST_THROW_EXCEPTION(std::runtime_error(msg)); });
-}
-
-void
Consumer::handleData(const Data& data)
{
auto dataPtr = data.shared_from_this();
@@ -82,7 +76,7 @@
m_bufferedData[getSegmentFromPacket(data)] = dataPtr;
writeInOrderData();
},
- [] (const Data& data, const security::v2::ValidationError& error) {
+ [] (const Data&, const security::v2::ValidationError& error) {
BOOST_THROW_EXCEPTION(DataValidationError(error));
});
}
diff --git a/tools/chunks/catchunks/consumer.hpp b/tools/chunks/catchunks/consumer.hpp
index 3dcefe6..633d05b 100644
--- a/tools/chunks/catchunks/consumer.hpp
+++ b/tools/chunks/catchunks/consumer.hpp
@@ -1,5 +1,5 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
+/*
* Copyright (c) 2016-2017, Regents of the University of California,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University.
@@ -80,9 +80,6 @@
private:
void
- startPipeline(const Data& data);
-
- void
handleData(const Data& data);
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index 3b35001..b979a51 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -154,9 +154,7 @@
auto interestId = m_face.expressInterest(interest,
bind(&PipelineInterestsAimd::handleData, this, _1, _2),
bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
- bind(&PipelineInterestsAimd::handleLifetimeExpiration,
- this, _1));
-
+ bind(&PipelineInterestsAimd::handleLifetimeExpiration, this, _1));
m_nInFlight++;
if (isRetransmission) {
@@ -246,11 +244,8 @@
m_nInFlight--;
}
- m_nReceived++;
- m_receivedSize += data.getContent().value_size();
-
increaseWindow();
- onData(interest, data);
+ onData(data);
if (segInfo.state == SegmentState::FirstTimeSent ||
segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
index b81ec97..40a4417 100644
--- a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
@@ -119,10 +119,7 @@
if (m_options.isVerbose)
std::cerr << "Received segment #" << getSegmentFromPacket(data) << std::endl;
- m_nReceived++;
- m_receivedSize += data.getContent().value_size();
-
- onData(interest, data);
+ onData(data);
if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
m_lastSegmentNo = data.getFinalBlockId().toSegment();
diff --git a/tools/chunks/catchunks/pipeline-interests.cpp b/tools/chunks/catchunks/pipeline-interests.cpp
index c0512cc..e520bfe 100644
--- a/tools/chunks/catchunks/pipeline-interests.cpp
+++ b/tools/chunks/catchunks/pipeline-interests.cpp
@@ -35,10 +35,10 @@
PipelineInterests::PipelineInterests(Face& face)
: m_face(face)
+ , m_hasFinalBlockId(false)
, m_lastSegmentNo(0)
, m_nReceived(0)
, m_receivedSize(0)
- , m_hasFinalBlockId(false)
, m_nextSegmentNo(0)
, m_excludedSegmentNo(0)
, m_isStopping(false)
@@ -48,11 +48,11 @@
PipelineInterests::~PipelineInterests() = default;
void
-PipelineInterests::run(const Data& data, DataCallback onData, FailureCallback onFailure)
+PipelineInterests::run(const Data& data, DataCallback dataCb, FailureCallback failureCb)
{
- BOOST_ASSERT(onData != nullptr);
- m_onData = std::move(onData);
- m_onFailure = std::move(onFailure);
+ BOOST_ASSERT(dataCb != nullptr);
+ m_onData = std::move(dataCb);
+ m_onFailure = std::move(failureCb);
m_prefix = data.getName().getPrefix(-1);
m_excludedSegmentNo = getSegmentFromPacket(data);
@@ -61,11 +61,11 @@
m_hasFinalBlockId = true;
}
+ onData(data);
+
// record the start time of the pipeline
m_startTime = time::steady_clock::now();
- m_nReceived++;
- m_receivedSize += data.getContent().value_size();
doRun();
}
@@ -90,6 +90,15 @@
}
void
+PipelineInterests::onData(const Data& data)
+{
+ m_nReceived++;
+ m_receivedSize += data.getContent().value_size();
+
+ m_onData(data);
+}
+
+void
PipelineInterests::onFailure(const std::string& reason)
{
if (m_isStopping)
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index c8e14fe..90fd76a 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -49,14 +49,14 @@
class PipelineInterests
{
public:
- typedef function<void(const std::string& reason)> FailureCallback;
+ using DataCallback = function<void(const Data&)>;
+ using FailureCallback = function<void(const std::string& reason)>;
-public:
/**
* @brief create a PipelineInterests service
*
- * Configures the pipelining service without specifying the retrieval namespace. After this
- * configuration the method run must be called to start the Pipeline.
+ * Configures the pipelining service without specifying the retrieval namespace.
+ * After construction, the method run() must be called in order to start the pipeline.
*/
explicit
PipelineInterests(Face& face);
@@ -100,11 +100,11 @@
uint64_t
getNextSegmentNo();
+ /**
+ * @brief subclasses must call this method to notify successful retrieval of a segment
+ */
void
- onData(const Interest& interest, const Data& data) const
- {
- m_onData(interest, data);
- }
+ onData(const Data& data);
/**
* @brief subclasses can call this method to signal an unrecoverable failure
@@ -131,7 +131,7 @@
* @brief perform subclass-specific operations to fetch all the segments
*
* When overriding this function, at a minimum, the subclass should implement the retrieving
- * of all the segments. Subclass must guarantee that onData is called at least once for every
+ * of all the segments. Subclass must guarantee that `onData` is called once for every
* segment that is fetched successfully.
*
* @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
@@ -145,12 +145,12 @@
protected:
Face& m_face;
Name m_prefix;
- uint64_t m_lastSegmentNo; ///< valid only if m_hasFinalBlockId == true
- int64_t m_nReceived; ///< number of segments received
- size_t m_receivedSize; ///< size of received data in bytes
PUBLIC_WITH_TESTS_ELSE_PROTECTED:
- bool m_hasFinalBlockId; ///< true if the last segment number is known
+ bool m_hasFinalBlockId; ///< true if the last segment number is known
+ uint64_t m_lastSegmentNo; ///< valid only if m_hasFinalBlockId == true
+ int64_t m_nReceived; ///< number of segments received
+ size_t m_receivedSize; ///< size of received data in bytes
private:
DataCallback m_onData;