chunks: modularize pipeline implementation

Change-Id: Iaa5acbb02a583858db6204716947cceeae793a65
Refs: #3636
diff --git a/tools/chunks/catchunks/consumer.cpp b/tools/chunks/catchunks/consumer.cpp
index 8976f6a..fe30f75 100644
--- a/tools/chunks/catchunks/consumer.cpp
+++ b/tools/chunks/catchunks/consumer.cpp
@@ -26,7 +26,6 @@
  */
 
 #include "consumer.hpp"
-#include "discover-version.hpp"
 
 namespace ndn {
 namespace chunks {
@@ -34,35 +33,37 @@
 Consumer::Consumer(Face& face, Validator& validator, bool isVerbose, std::ostream& os)
   : m_face(face)
   , m_validator(validator)
-  , m_pipeline(nullptr)
-  , m_nextToPrint(0)
   , m_outputStream(os)
+  , m_nextToPrint(0)
   , m_isVerbose(isVerbose)
 {
 }
 
-void Consumer::run(DiscoverVersion& discover, PipelineInterests& pipeline)
+void
+Consumer::run(unique_ptr<DiscoverVersion> discover, unique_ptr<PipelineInterests> pipeline)
 {
-  m_pipeline = &pipeline;
+  m_discover = std::move(discover);
+  m_pipeline = std::move(pipeline);
   m_nextToPrint = 0;
+  m_bufferedData.clear();
 
-  discover.onDiscoverySuccess.connect(bind(&Consumer::runWithData, this, _1));
-  discover.onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+  discover->onDiscoverySuccess.connect(bind(&Consumer::startPipeline, this, _1));
+  discover->onDiscoveryFailure.connect(bind(&Consumer::onFailure, this, _1));
+  discover->run();
 
-  discover.run();
   m_face.processEvents();
 }
 
-void Consumer::runWithData(const Data& data)
+void
+Consumer::startPipeline(const Data& data)
 {
   m_validator.validate(data,
                        bind(&Consumer::onDataValidated, this, _1),
                        bind(&Consumer::onFailure, this, _2));
 
-  m_pipeline->runWithExcludedSegment(data,
-                                     bind(&Consumer::onData, this, _1, _2),
-                                     bind(&Consumer::onFailure, this, _1));
-
+  m_pipeline->run(data,
+                  bind(&Consumer::onData, this, _1, _2),
+                  bind(&Consumer::onFailure, this, _1));
 }
 
 void
@@ -100,7 +101,6 @@
   for (auto it = m_bufferedData.begin();
        it != m_bufferedData.end() && it->first == m_nextToPrint;
        it = m_bufferedData.erase(it), ++m_nextToPrint) {
-
     const Block& content = it->second->getContent();
     m_outputStream.write(reinterpret_cast<const char*>(content.value()), content.value_size());
   }
diff --git a/tools/chunks/catchunks/consumer.hpp b/tools/chunks/catchunks/consumer.hpp
index 9428a06..e2451be 100644
--- a/tools/chunks/catchunks/consumer.hpp
+++ b/tools/chunks/catchunks/consumer.hpp
@@ -25,12 +25,11 @@
  * @author Andrea Tosatto
  */
 
-
 #ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_CONSUMER_HPP
 #define NDN_TOOLS_CHUNKS_CATCHUNKS_CONSUMER_HPP
 
-#include "pipeline-interests.hpp"
 #include "discover-version.hpp"
+#include "pipeline-interests.hpp"
 
 #include <ndn-cxx/security/validator.hpp>
 
@@ -66,11 +65,11 @@
    * @brief Run the consumer
    */
   void
-  run(DiscoverVersion& discover, PipelineInterests& pipeline);
+  run(unique_ptr<DiscoverVersion> discover, unique_ptr<PipelineInterests> pipeline);
 
 private:
   void
-  runWithData(const Data& data);
+  startPipeline(const Data& data);
 
   void
   onData(const Interest& interest, const Data& data);
@@ -88,9 +87,10 @@
 private:
   Face& m_face;
   Validator& m_validator;
-  PipelineInterests* m_pipeline;
-  uint64_t m_nextToPrint;
   std::ostream& m_outputStream;
+  unique_ptr<DiscoverVersion> m_discover;
+  unique_ptr<PipelineInterests> m_pipeline;
+  uint64_t m_nextToPrint;
   bool m_isVerbose;
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
diff --git a/tools/chunks/catchunks/ndncatchunks.cpp b/tools/chunks/catchunks/ndncatchunks.cpp
index d2b679f..98839b3 100644
--- a/tools/chunks/catchunks/ndncatchunks.cpp
+++ b/tools/chunks/catchunks/ndncatchunks.cpp
@@ -23,6 +23,8 @@
  * @author Wentao Shang
  * @author Steve DiBenedetto
  * @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
  */
 
 #include "core/version.hpp"
@@ -30,6 +32,7 @@
 #include "consumer.hpp"
 #include "discover-version-fixed.hpp"
 #include "discover-version-iterative.hpp"
+#include "pipeline-interests-fixed-window.hpp"
 
 #include <ndn-cxx/security/validator-null.hpp>
 
@@ -42,6 +45,7 @@
   std::string programName(argv[0]);
   Options options;
   std::string discoverType("fixed");
+  std::string pipelineType("fixed");
   size_t maxPipelineSize(1);
   int maxRetriesAfterVersionFound(1);
   std::string uri;
@@ -148,15 +152,23 @@
       return 2;
     }
 
+    unique_ptr<PipelineInterests> pipeline;
+    if (pipelineType == "fixed") {
+      PipelineInterestsFixedWindow::Options optionsPipeline(options);
+      optionsPipeline.maxPipelineSize = maxPipelineSize;
+      pipeline = make_unique<PipelineInterestsFixedWindow>(face, optionsPipeline);
+    }
+    else {
+      std::cerr << "ERROR: Interest pipeline type not valid" << std::endl;
+      return 2;
+    }
+
     ValidatorNull validator;
     Consumer consumer(face, validator, options.isVerbose);
 
-    PipelineInterests::Options optionsPipeline(options);
-    optionsPipeline.maxPipelineSize = maxPipelineSize;
-    PipelineInterests pipeline(face, optionsPipeline);
-
     BOOST_ASSERT(discover != nullptr);
-    consumer.run(*discover, pipeline);
+    BOOST_ASSERT(pipeline != nullptr);
+    consumer.run(std::move(discover), std::move(pipeline));
   }
   catch (const Consumer::ApplicationNackError& e) {
     std::cerr << "ERROR: " << e.what() << std::endl;
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
new file mode 100644
index 0000000..049d52d
--- /dev/null
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.cpp
@@ -0,0 +1,183 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2016,  Regents of the University of California,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Wentao Shang
+ * @author Steve DiBenedetto
+ * @author Andrea Tosatto
+ * @author Davide Pesavento
+ */
+
+#include "pipeline-interests-fixed-window.hpp"
+#include "data-fetcher.hpp"
+
+namespace ndn {
+namespace chunks {
+
+PipelineInterestsFixedWindow::PipelineInterestsFixedWindow(Face& face, const Options& options)
+  : PipelineInterests(face)
+  , m_options(options)
+  , m_nextSegmentNo(0)
+  , m_hasFailure(false)
+{
+  m_segmentFetchers.resize(m_options.maxPipelineSize);
+}
+
+PipelineInterestsFixedWindow::~PipelineInterestsFixedWindow()
+{
+  cancel();
+}
+
+void
+PipelineInterestsFixedWindow::doRun()
+{
+  // if the FinalBlockId is unknown, this could potentially request non-existent segments
+  for (size_t nRequestedSegments = 0;
+       nRequestedSegments < m_options.maxPipelineSize;
+       ++nRequestedSegments) {
+    if (!fetchNextSegment(nRequestedSegments))
+      // all segments have been requested
+      break;
+  }
+}
+
+bool
+PipelineInterestsFixedWindow::fetchNextSegment(std::size_t pipeNo)
+{
+  if (isStopping())
+    return false;
+
+  if (m_hasFailure) {
+    onFailure("Fetching terminated but no final segment number has been found");
+    return false;
+  }
+
+  if (m_nextSegmentNo == m_excludedSegmentNo)
+    m_nextSegmentNo++;
+
+  if (m_hasFinalBlockId && m_nextSegmentNo > m_lastSegmentNo)
+   return false;
+
+  // send interest for next segment
+  if (m_options.isVerbose)
+    std::cerr << "Requesting segment #" << m_nextSegmentNo << std::endl;
+
+  Interest interest(Name(m_prefix).appendSegment(m_nextSegmentNo));
+  interest.setInterestLifetime(m_options.interestLifetime);
+  interest.setMustBeFresh(m_options.mustBeFresh);
+  interest.setMaxSuffixComponents(1);
+
+  auto fetcher = DataFetcher::fetch(m_face, interest,
+                                    m_options.maxRetriesOnTimeoutOrNack,
+                                    m_options.maxRetriesOnTimeoutOrNack,
+                                    bind(&PipelineInterestsFixedWindow::handleData, this, _1, _2, pipeNo),
+                                    bind(&PipelineInterestsFixedWindow::handleFail, this, _2, pipeNo),
+                                    bind(&PipelineInterestsFixedWindow::handleFail, this, _2, pipeNo),
+                                    m_options.isVerbose);
+
+  BOOST_ASSERT(!m_segmentFetchers[pipeNo].first || !m_segmentFetchers[pipeNo].first->isRunning());
+  m_segmentFetchers[pipeNo] = make_pair(fetcher, m_nextSegmentNo);
+  m_nextSegmentNo++;
+
+  return true;
+}
+
+void
+PipelineInterestsFixedWindow::doCancel()
+{
+  for (auto& fetcher : m_segmentFetchers) {
+    if (fetcher.first)
+      fetcher.first->cancel();
+  }
+
+  m_segmentFetchers.clear();
+}
+
+void
+PipelineInterestsFixedWindow::handleData(const Interest& interest, const Data& data, size_t pipeNo)
+{
+  if (isStopping())
+    return;
+
+  BOOST_ASSERT(data.getName().equals(interest.getName()));
+
+  if (m_options.isVerbose)
+    std::cerr << "Received segment #" << data.getName()[-1].toSegment() << std::endl;
+
+  onData(interest, data);
+
+  if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
+    m_lastSegmentNo = data.getFinalBlockId().toSegment();
+    m_hasFinalBlockId = true;
+
+    for (auto& fetcher : m_segmentFetchers) {
+      if (fetcher.first == nullptr)
+        continue;
+
+      if (fetcher.second > m_lastSegmentNo) {
+        // stop trying to fetch segments that are beyond m_lastSegmentNo
+        fetcher.first->cancel();
+      }
+      else if (fetcher.first->hasError()) { // fetcher.second <= m_lastSegmentNo
+        // there was an error while fetching a segment that is part of the content
+        return onFailure("Failure retrieving segment #" + to_string(fetcher.second));
+      }
+    }
+  }
+
+  fetchNextSegment(pipeNo);
+}
+
+void PipelineInterestsFixedWindow::handleFail(const std::string& reason, std::size_t pipeNo)
+{
+  if (isStopping())
+    return;
+
+  // if the failed segment is definitely part of the content, raise a fatal error
+  if (m_hasFinalBlockId && m_segmentFetchers[pipeNo].second <= m_lastSegmentNo)
+    return onFailure(reason);
+
+  if (!m_hasFinalBlockId) {
+    bool areAllFetchersStopped = true;
+    for (auto& fetcher : m_segmentFetchers) {
+      if (fetcher.first == nullptr)
+        continue;
+
+      // cancel fetching all segments that follow
+      if (fetcher.second > m_segmentFetchers[pipeNo].second) {
+        fetcher.first->cancel();
+      }
+      else if (fetcher.first->isRunning()) { // fetcher.second <= m_segmentFetchers[pipeNo].second
+        areAllFetchersStopped = false;
+      }
+    }
+
+    if (areAllFetchersStopped) {
+      onFailure("Fetching terminated but no final segment number has been found");
+    }
+    else {
+      m_hasFailure = true;
+    }
+  }
+}
+
+} // namespace chunks
+} // namespace ndn
diff --git a/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp b/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp
new file mode 100644
index 0000000..2da40e6
--- /dev/null
+++ b/tools/chunks/catchunks/pipeline-interests-fixed-window.hpp
@@ -0,0 +1,118 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2016,  Regents of the University of California,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Wentao Shang
+ * @author Steve DiBenedetto
+ * @author Andrea Tosatto
+ * @author Davide Pesavento
+ */
+
+#include "options.hpp"
+#include "pipeline-interests.hpp"
+
+namespace ndn {
+namespace chunks {
+
+class DataFetcher;
+
+class PipelineInterestsFixedWindowOptions : public Options
+{
+public:
+  explicit
+  PipelineInterestsFixedWindowOptions(const Options& options = Options())
+    : Options(options)
+    , maxPipelineSize(1)
+  {
+  }
+
+public:
+  size_t maxPipelineSize;
+};
+
+/**
+ * @brief Service for retrieving Data via an Interest pipeline
+ *
+ * Retrieves all segments of Data under a given prefix by maintaining a fixed-size window of
+ * N Interests in flight. A user-specified callback function is used to notify the arrival of
+ * each segment of Data.
+ *
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
+ */
+class PipelineInterestsFixedWindow : public PipelineInterests
+{
+public:
+  typedef PipelineInterestsFixedWindowOptions Options;
+
+public:
+  /**
+   * @brief create a PipelineInterestsFixedWindow service
+   *
+   * Configures the pipelining service without specifying the retrieval namespace. After this
+   * configuration the method run must be called to start the Pipeline.
+   */
+  explicit
+  PipelineInterestsFixedWindow(Face& face, const Options& options = Options());
+
+  ~PipelineInterestsFixedWindow() final;
+
+private:
+  /**
+   * @brief fetch all the segments between 0 and m_lastSegmentNo
+   *
+   * Starts a fixed-window pipeline with size equal to m_options.maxPipelineSize. The pipeline
+   * will fetch every segment until the last segment is successfully received or an error occurs.
+   * The segment with segment number equal to m_excludedSegmentNo will not be fetched.
+   */
+  void
+  doRun() final;
+
+  void
+  doCancel() final;
+
+  /**
+   * @brief fetch the next segment that has not been requested yet
+   *
+   * @return false if there is an error or all the segments have been fetched, true otherwise
+   */
+  bool
+  fetchNextSegment(size_t pipeNo);
+
+  void
+  handleData(const Interest& interest, const Data& data, size_t pipeNo);
+
+  void
+  handleFail(const std::string& reason, size_t pipeNo);
+
+private:
+  const Options m_options;
+  std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
+  uint64_t m_nextSegmentNo;
+  /**
+   * true if one or more segment fetchers encountered an error; if m_hasFinalBlockId
+   * is false, this is usually not a fatal error for the pipeline
+   */
+  bool m_hasFailure;
+};
+
+} // namespace chunks
+} // namespace ndn
diff --git a/tools/chunks/catchunks/pipeline-interests.cpp b/tools/chunks/catchunks/pipeline-interests.cpp
index b07c959..ff817e4 100644
--- a/tools/chunks/catchunks/pipeline-interests.cpp
+++ b/tools/chunks/catchunks/pipeline-interests.cpp
@@ -23,179 +23,63 @@
  * @author Wentao Shang
  * @author Steve DiBenedetto
  * @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
  */
 
 #include "pipeline-interests.hpp"
-#include "data-fetcher.hpp"
 
 namespace ndn {
 namespace chunks {
 
-PipelineInterests::PipelineInterests(Face& face, const Options& options)
+PipelineInterests::PipelineInterests(Face& face)
   : m_face(face)
-  , m_nextSegmentNo(0)
   , m_lastSegmentNo(0)
-  , m_excludeSegmentNo(0)
-  , m_options(options)
+  , m_excludedSegmentNo(0)
   , m_hasFinalBlockId(false)
-  , m_hasError(false)
-  , m_hasFailure(false)
+  , m_isStopping(false)
 {
-  m_segmentFetchers.resize(m_options.maxPipelineSize);
 }
 
-PipelineInterests::~PipelineInterests()
-{
-  cancel();
-}
+PipelineInterests::~PipelineInterests() = default;
 
 void
-PipelineInterests::runWithExcludedSegment(const Data& data, DataCallback onData,
-                                          FailureCallback onFailure)
+PipelineInterests::run(const Data& data, DataCallback onData, FailureCallback onFailure)
 {
   BOOST_ASSERT(onData != nullptr);
   m_onData = std::move(onData);
   m_onFailure = std::move(onFailure);
-
-  Name dataName = data.getName();
-  m_prefix = dataName.getPrefix(-1);
-  m_excludeSegmentNo = dataName[-1].toSegment();
+  m_prefix = data.getName().getPrefix(-1);
+  m_excludedSegmentNo = data.getName()[-1].toSegment();
 
   if (!data.getFinalBlockId().empty()) {
-    m_hasFinalBlockId = true;
     m_lastSegmentNo = data.getFinalBlockId().toSegment();
+    m_hasFinalBlockId = true;
   }
 
-  // if the FinalBlockId is unknown, this could potentially request non-existent segments
-  for (size_t nRequestedSegments = 0; nRequestedSegments < m_options.maxPipelineSize;
-       nRequestedSegments++) {
-    if (!fetchNextSegment(nRequestedSegments)) // all segments have been requested
-      break;
-  }
-}
-
-bool
-PipelineInterests::fetchNextSegment(std::size_t pipeNo)
-{
-  if (m_hasFailure) {
-    fail("Fetching terminated but no final segment number has been found");
-    return false;
-  }
-
-  if (m_nextSegmentNo == m_excludeSegmentNo)
-    m_nextSegmentNo++;
-
-  if (m_hasFinalBlockId && m_nextSegmentNo > m_lastSegmentNo)
-   return false;
-
-  // Send interest for next segment
-  if (m_options.isVerbose)
-    std::cerr << "Requesting segment #" << m_nextSegmentNo << std::endl;
-
-  Interest interest(Name(m_prefix).appendSegment(m_nextSegmentNo));
-  interest.setInterestLifetime(m_options.interestLifetime);
-  interest.setMustBeFresh(m_options.mustBeFresh);
-  interest.setMaxSuffixComponents(1);
-
-  BOOST_ASSERT(!m_segmentFetchers[pipeNo].first || !m_segmentFetchers[pipeNo].first->isRunning());
-
-  auto fetcher = DataFetcher::fetch(m_face, interest,
-                                    m_options.maxRetriesOnTimeoutOrNack,
-                                    m_options.maxRetriesOnTimeoutOrNack,
-                                    bind(&PipelineInterests::handleData, this, _1, _2, pipeNo),
-                                    bind(&PipelineInterests::handleFail, this, _2, pipeNo),
-                                    bind(&PipelineInterests::handleFail, this, _2, pipeNo),
-                                    m_options.isVerbose);
-
-  m_segmentFetchers[pipeNo] = make_pair(fetcher, m_nextSegmentNo);
-
-  m_nextSegmentNo++;
-  return true;
+  doRun();
 }
 
 void
 PipelineInterests::cancel()
 {
-  for (auto& fetcher : m_segmentFetchers)
-    if (fetcher.first)
-      fetcher.first->cancel();
+  if (m_isStopping)
+    return;
 
-  m_segmentFetchers.clear();
+  m_isStopping = true;
+  doCancel();
 }
 
 void
-PipelineInterests::fail(const std::string& reason)
+PipelineInterests::onFailure(const std::string& reason)
 {
-  if (!m_hasError) {
-    cancel();
-    m_hasError = true;
-    m_hasFailure = true;
-    if (m_onFailure)
-      m_face.getIoService().post([this, reason] { m_onFailure(reason); });
-  }
-}
-
-void
-PipelineInterests::handleData(const Interest& interest, const Data& data, size_t pipeNo)
-{
-  if (m_hasError)
+  if (m_isStopping)
     return;
 
-  BOOST_ASSERT(data.getName().equals(interest.getName()));
+  cancel();
 
-  if (m_options.isVerbose)
-    std::cerr << "Received segment #" << data.getName()[-1].toSegment() << std::endl;
-
-  m_onData(interest, data);
-
-  if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
-    m_lastSegmentNo = data.getFinalBlockId().toSegment();
-    m_hasFinalBlockId = true;
-
-    for (auto& fetcher : m_segmentFetchers) {
-      if (fetcher.first && fetcher.second > m_lastSegmentNo) {
-        // Stop trying to fetch segments that are not part of the content
-        fetcher.first->cancel();
-      }
-      else if (fetcher.first && fetcher.first->hasError()) { // fetcher.second <= m_lastSegmentNo
-        // there was an error while fetching a segment that is part of the content
-        fail("Failure retriving segment #" + to_string(fetcher.second));
-        return;
-      }
-    }
-  }
-
-  fetchNextSegment(pipeNo);
-}
-
-void PipelineInterests::handleFail(const std::string& reason, std::size_t pipeNo)
-{
-  if (m_hasError)
-    return;
-
-  if (m_hasFinalBlockId && m_segmentFetchers[pipeNo].second <= m_lastSegmentNo) {
-    fail(reason);
-  }
-  else if (!m_hasFinalBlockId) {
-    // don't fetch the following segments
-    bool areAllFetchersStopped = true;
-    for (auto& fetcher : m_segmentFetchers) {
-      if (fetcher.first && fetcher.second > m_segmentFetchers[pipeNo].second) {
-        fetcher.first->cancel();
-      }
-      else if (fetcher.first && fetcher.first->isRunning()) {
-        // fetcher.second <= m_segmentFetchers[pipeNo].second
-        areAllFetchersStopped = false;
-      }
-    }
-    if (areAllFetchersStopped) {
-      if (m_onFailure)
-        fail("Fetching terminated but no final segment number has been found");
-    }
-    else {
-      m_hasFailure = true;
-    }
-  }
+  if (m_onFailure)
+    m_face.getIoService().post([this, reason] { m_onFailure(reason); });
 }
 
 } // namespace chunks
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index 32d5b79..63d714d 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -23,46 +23,31 @@
  * @author Wentao Shang
  * @author Steve DiBenedetto
  * @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
  */
 
 #ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
 #define NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
 
 #include "core/common.hpp"
-#include "options.hpp"
 
 namespace ndn {
 namespace chunks {
 
-class DataFetcher;
-
-class PipelineInterestsOptions : public Options
-{
-public:
-  explicit
-  PipelineInterestsOptions(const Options& options = Options())
-    : Options(options)
-    , maxPipelineSize(1)
-  {
-  }
-
-public:
-  size_t maxPipelineSize;
-};
-
 /**
  * @brief Service for retrieving Data via an Interest pipeline
  *
- * Retrieves all segmented Data under the specified prefix by maintaining a pipeline of N Interests
- * in flight.
+ * Retrieves all segments of Data under a given prefix by maintaining a (variable or fixed-size)
+ * window of N Interests in flight. A user-specified callback function is used to notify
+ * the arrival of each segment of Data.
  *
- * Provides retrieved Data on arrival with no ordering guarantees. Data is delivered to the
- * PipelineInterests' user via callback immediately upon arrival.
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
  */
 class PipelineInterests
 {
 public:
-  typedef PipelineInterestsOptions Options;
   typedef function<void(const std::string& reason)> FailureCallback;
 
 public:
@@ -70,26 +55,23 @@
    * @brief create a PipelineInterests service
    *
    * Configures the pipelining service without specifying the retrieval namespace. After this
-   * configuration the method runWithExcludedSegment must be called to run the Pipeline.
+   * configuration the method run must be called to start the Pipeline.
    */
   explicit
-  PipelineInterests(Face& face, const Options& options = Options());
+  PipelineInterests(Face& face);
 
+  virtual
   ~PipelineInterests();
 
   /**
-   * @brief fetch all the segments between 0 and lastSegment of the specified prefix
+   * @brief start fetching all the segments of the specified prefix
    *
-   * Starts the pipeline of size defined inside the options. The pipeline retrieves all the segments
-   * until the last segment is received, @p data is excluded from the retrieving.
-   *
-   * @param data a segment of the segmented Data to retrive; data.getName() must end with a segment
-   *        number
+   * @param data a segment of the segmented Data to fetch; the Data name must end with a segment number
    * @param onData callback for every segment correctly received, must not be empty
-   * @param onfailure callback called if an error occurs
+   * @param onFailure callback if an error occurs, may be empty
    */
   void
-  runWithExcludedSegment(const Data& data, DataCallback onData, FailureCallback onFailure);
+  run(const Data& data, DataCallback onData, FailureCallback onFailure);
 
   /**
    * @brief stop all fetch operations
@@ -97,44 +79,52 @@
   void
   cancel();
 
+protected:
+  bool
+  isStopping() const
+  {
+    return m_isStopping;
+  }
+
+  void
+  onData(const Interest& interest, const Data& data) const
+  {
+    m_onData(interest, data);
+  }
+
+  /**
+   * @brief subclasses can call this method to signal an unrecoverable failure
+   */
+  void
+  onFailure(const std::string& reason);
+
 private:
   /**
-   * @brief fetch the next segment that has not been requested yet
+   * @brief perform subclass-specific operations to fetch all the segments
    *
-   * @return false if there is an error or all the segments have been fetched, true otherwise
+   * When overriding this function, at a minimum, the subclass should implement the retrieving
+   * of all the segments. Segment m_excludedSegmentNo can be skipped. Subclass must guarantee
+   * that onData is called at least once for every segment that is fetched successfully.
+   *
+   * @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
    */
-  bool
-  fetchNextSegment(size_t pipeNo);
+  virtual void
+  doRun() = 0;
 
-  void
-  fail(const std::string& reason);
+  virtual void
+  doCancel() = 0;
 
-  void
-  handleData(const Interest& interest, const Data& data, size_t pipeNo);
-
-  void
-  handleFail(const std::string& reason, size_t pipeNo);
+protected:
+  Face& m_face;
+  Name m_prefix;
+  uint64_t m_lastSegmentNo;
+  uint64_t m_excludedSegmentNo;
+  bool m_hasFinalBlockId; ///< true if the last segment number is known
 
 private:
-  Name m_prefix;
-  Face& m_face;
-  uint64_t m_nextSegmentNo;
-  uint64_t m_lastSegmentNo;
-  uint64_t m_excludeSegmentNo;
   DataCallback m_onData;
   FailureCallback m_onFailure;
-  const Options m_options;
-  std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
-  bool m_hasFinalBlockId;
-  /**
-   * true if there's a critical error
-   */
-  bool m_hasError;
-  /**
-   * true if one or more segmentFetcher failed, if lastSegmentNo is not set this is usually not a
-   * fatal error for the pipeline
-   */
-  bool m_hasFailure;
+  bool m_isStopping;
 };
 
 } // namespace chunks