chunks: modularize pipeline implementation

Change-Id: Iaa5acbb02a583858db6204716947cceeae793a65
Refs: #3636
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index 32d5b79..63d714d 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -23,46 +23,31 @@
  * @author Wentao Shang
  * @author Steve DiBenedetto
  * @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
  */
 
 #ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
 #define NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
 
 #include "core/common.hpp"
-#include "options.hpp"
 
 namespace ndn {
 namespace chunks {
 
-class DataFetcher;
-
-class PipelineInterestsOptions : public Options
-{
-public:
-  explicit
-  PipelineInterestsOptions(const Options& options = Options())
-    : Options(options)
-    , maxPipelineSize(1)
-  {
-  }
-
-public:
-  size_t maxPipelineSize;
-};
-
 /**
  * @brief Service for retrieving Data via an Interest pipeline
  *
- * Retrieves all segmented Data under the specified prefix by maintaining a pipeline of N Interests
- * in flight.
+ * Retrieves all segments of Data under a given prefix by maintaining a (variable or fixed-size)
+ * window of N Interests in flight. A user-specified callback function is used to notify
+ * the arrival of each segment of Data.
  *
- * Provides retrieved Data on arrival with no ordering guarantees. Data is delivered to the
- * PipelineInterests' user via callback immediately upon arrival.
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
  */
 class PipelineInterests
 {
 public:
-  typedef PipelineInterestsOptions Options;
   typedef function<void(const std::string& reason)> FailureCallback;
 
 public:
@@ -70,26 +55,23 @@
    * @brief create a PipelineInterests service
    *
    * Configures the pipelining service without specifying the retrieval namespace. After this
-   * configuration the method runWithExcludedSegment must be called to run the Pipeline.
+   * configuration the method run must be called to start the Pipeline.
    */
   explicit
-  PipelineInterests(Face& face, const Options& options = Options());
+  PipelineInterests(Face& face);
 
+  virtual
   ~PipelineInterests();
 
   /**
-   * @brief fetch all the segments between 0 and lastSegment of the specified prefix
+   * @brief start fetching all the segments of the specified prefix
    *
-   * Starts the pipeline of size defined inside the options. The pipeline retrieves all the segments
-   * until the last segment is received, @p data is excluded from the retrieving.
-   *
-   * @param data a segment of the segmented Data to retrive; data.getName() must end with a segment
-   *        number
+   * @param data a segment of the segmented Data to fetch; the Data name must end with a segment number
    * @param onData callback for every segment correctly received, must not be empty
-   * @param onfailure callback called if an error occurs
+   * @param onFailure callback if an error occurs, may be empty
    */
   void
-  runWithExcludedSegment(const Data& data, DataCallback onData, FailureCallback onFailure);
+  run(const Data& data, DataCallback onData, FailureCallback onFailure);
 
   /**
    * @brief stop all fetch operations
@@ -97,44 +79,52 @@
   void
   cancel();
 
+protected:
+  bool
+  isStopping() const
+  {
+    return m_isStopping;
+  }
+
+  void
+  onData(const Interest& interest, const Data& data) const
+  {
+    m_onData(interest, data);
+  }
+
+  /**
+   * @brief subclasses can call this method to signal an unrecoverable failure
+   */
+  void
+  onFailure(const std::string& reason);
+
 private:
   /**
-   * @brief fetch the next segment that has not been requested yet
+   * @brief perform subclass-specific operations to fetch all the segments
    *
-   * @return false if there is an error or all the segments have been fetched, true otherwise
+   * When overriding this function, at a minimum, the subclass should implement the retrieving
+   * of all the segments. Segment m_excludedSegmentNo can be skipped. Subclass must guarantee
+   * that onData is called at least once for every segment that is fetched successfully.
+   *
+   * @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
    */
-  bool
-  fetchNextSegment(size_t pipeNo);
+  virtual void
+  doRun() = 0;
 
-  void
-  fail(const std::string& reason);
+  virtual void
+  doCancel() = 0;
 
-  void
-  handleData(const Interest& interest, const Data& data, size_t pipeNo);
-
-  void
-  handleFail(const std::string& reason, size_t pipeNo);
+protected:
+  Face& m_face;
+  Name m_prefix;
+  uint64_t m_lastSegmentNo;
+  uint64_t m_excludedSegmentNo;
+  bool m_hasFinalBlockId; ///< true if the last segment number is known
 
 private:
-  Name m_prefix;
-  Face& m_face;
-  uint64_t m_nextSegmentNo;
-  uint64_t m_lastSegmentNo;
-  uint64_t m_excludeSegmentNo;
   DataCallback m_onData;
   FailureCallback m_onFailure;
-  const Options m_options;
-  std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
-  bool m_hasFinalBlockId;
-  /**
-   * true if there's a critical error
-   */
-  bool m_hasError;
-  /**
-   * true if one or more segmentFetcher failed, if lastSegmentNo is not set this is usually not a
-   * fatal error for the pipeline
-   */
-  bool m_hasFailure;
+  bool m_isStopping;
 };
 
 } // namespace chunks