chunks: modularize pipeline implementation
Change-Id: Iaa5acbb02a583858db6204716947cceeae793a65
Refs: #3636
diff --git a/tools/chunks/catchunks/consumer.cpp b/tools/chunks/catchunks/consumer.cpp
index 8976f6a..fe30f75 100644
--- a/tools/chunks/catchunks/consumer.cpp
+++ b/tools/chunks/catchunks/consumer.cpp
@@ -26,7 +26,6 @@
*/
#include "consumer.hpp"
-#include "discover-version.hpp"
namespace ndn {
namespace chunks {
@@ -34,35 +33,37 @@
Consumer::Consumer(Face& face, Validator& validator, bool isVerbose, std::ostream& os)
: m_face(face)
, m_validator(validator)
- , m_pipeline(nullptr)
- , m_nextToPrint(0)
, m_outputStream(os)
+ , m_nextToPrint(0)
, m_isVerbose(isVerbose)
{
}
-void Consumer::run(DiscoverVersion& discover, PipelineInterests& pipeline)
+void
+Consumer::run(unique_ptr<DiscoverVersion> discover, unique_ptr<PipelineInterests> pipeline)
{
- m_pipeline = &pipeline;
+ m_discover = std::move(discover);
+ m_pipeline = std::move(pipeline);
m_nextToPrint = 0;
+ m_bufferedData.clear();
- discover.onDiscoverySuccess.connect(bind(&Consumer::runWithData, this, _1));
- discover.onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+ discover->onDiscoverySuccess.connect(bind(&Consumer::startPipeline, this, _1));
+ discover->onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+ discover->run();
- discover.run();
m_face.processEvents();
}
-void Consumer::runWithData(const Data& data)
+void
+Consumer::startPipeline(const Data& data)
{
m_validator.validate(data,
bind(&Consumer::onDataValidated, this, _1),
bind(&Consumer::onFailure, this, _2));
- m_pipeline->runWithExcludedSegment(data,
- bind(&Consumer::onData, this, _1, _2),
- bind(&Consumer::onFailure, this, _1));
-
+ m_pipeline->run(data,
+ bind(&Consumer::onData, this, _1, _2),
+ bind(&Consumer::onFailure, this, _1));
}
void
@@ -100,7 +101,6 @@
for (auto it = m_bufferedData.begin();
it != m_bufferedData.end() && it->first == m_nextToPrint;
it = m_bufferedData.erase(it), ++m_nextToPrint) {
-
const Block& content = it->second->getContent();
m_outputStream.write(reinterpret_cast<const char*>(content.value()), content.value_size());
}