chunks: modularize pipeline implementation
Change-Id: Iaa5acbb02a583858db6204716947cceeae793a65
Refs: #3636
diff --git a/tools/chunks/catchunks/pipeline-interests.hpp b/tools/chunks/catchunks/pipeline-interests.hpp
index 32d5b79..63d714d 100644
--- a/tools/chunks/catchunks/pipeline-interests.hpp
+++ b/tools/chunks/catchunks/pipeline-interests.hpp
@@ -23,46 +23,31 @@
* @author Wentao Shang
* @author Steve DiBenedetto
* @author Andrea Tosatto
+ * @author Davide Pesavento
+ * @author Weiwei Liu
*/
#ifndef NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
#define NDN_TOOLS_CHUNKS_CATCHUNKS_PIPELINE_INTERESTS_HPP
#include "core/common.hpp"
-#include "options.hpp"
namespace ndn {
namespace chunks {
-class DataFetcher;
-
-class PipelineInterestsOptions : public Options
-{
-public:
- explicit
- PipelineInterestsOptions(const Options& options = Options())
- : Options(options)
- , maxPipelineSize(1)
- {
- }
-
-public:
- size_t maxPipelineSize;
-};
-
/**
* @brief Service for retrieving Data via an Interest pipeline
*
- * Retrieves all segmented Data under the specified prefix by maintaining a pipeline of N Interests
- * in flight.
+ * Retrieves all segments of Data under a given prefix by maintaining a (variable or fixed-size)
+ * window of N Interests in flight. A user-specified callback function is used to notify
+ * the arrival of each segment of Data.
*
- * Provides retrieved Data on arrival with no ordering guarantees. Data is delivered to the
- * PipelineInterests' user via callback immediately upon arrival.
+ * No guarantees are made as to the order in which segments are fetched or callbacks are invoked,
+ * i.e. out-of-order delivery is possible.
*/
class PipelineInterests
{
public:
- typedef PipelineInterestsOptions Options;
typedef function<void(const std::string& reason)> FailureCallback;
public:
@@ -70,26 +55,23 @@
* @brief create a PipelineInterests service
*
* Configures the pipelining service without specifying the retrieval namespace. After this
- * configuration the method runWithExcludedSegment must be called to run the Pipeline.
+ * configuration the method run must be called to start the Pipeline.
*/
explicit
- PipelineInterests(Face& face, const Options& options = Options());
+ PipelineInterests(Face& face);
+ virtual
~PipelineInterests();
/**
- * @brief fetch all the segments between 0 and lastSegment of the specified prefix
+ * @brief start fetching all the segments of the specified prefix
*
- * Starts the pipeline of size defined inside the options. The pipeline retrieves all the segments
- * until the last segment is received, @p data is excluded from the retrieving.
- *
- * @param data a segment of the segmented Data to retrive; data.getName() must end with a segment
- * number
+ * @param data a segment of the segmented Data to fetch; the Data name must end with a segment number
* @param onData callback for every segment correctly received, must not be empty
- * @param onfailure callback called if an error occurs
+ * @param onFailure callback if an error occurs, may be empty
*/
void
- runWithExcludedSegment(const Data& data, DataCallback onData, FailureCallback onFailure);
+ run(const Data& data, DataCallback onData, FailureCallback onFailure);
/**
* @brief stop all fetch operations
@@ -97,44 +79,52 @@
void
cancel();
+protected:
+ bool
+ isStopping() const
+ {
+ return m_isStopping;
+ }
+
+ void
+ onData(const Interest& interest, const Data& data) const
+ {
+ m_onData(interest, data);
+ }
+
+ /**
+ * @brief subclasses can call this method to signal an unrecoverable failure
+ */
+ void
+ onFailure(const std::string& reason);
+
private:
/**
- * @brief fetch the next segment that has not been requested yet
+ * @brief perform subclass-specific operations to fetch all the segments
*
- * @return false if there is an error or all the segments have been fetched, true otherwise
+ * When overriding this function, at a minimum, the subclass should implement the retrieving
+ * of all the segments. Segment m_excludedSegmentNo can be skipped. Subclass must guarantee
+ * that onData is called at least once for every segment that is fetched successfully.
+ *
+ * @note m_lastSegmentNo contains a valid value only if m_hasFinalBlockId is true.
*/
- bool
- fetchNextSegment(size_t pipeNo);
+ virtual void
+ doRun() = 0;
- void
- fail(const std::string& reason);
+ virtual void
+ doCancel() = 0;
- void
- handleData(const Interest& interest, const Data& data, size_t pipeNo);
-
- void
- handleFail(const std::string& reason, size_t pipeNo);
+protected:
+ Face& m_face;
+ Name m_prefix;
+ uint64_t m_lastSegmentNo;
+ uint64_t m_excludedSegmentNo;
+ bool m_hasFinalBlockId; ///< true if the last segment number is known
private:
- Name m_prefix;
- Face& m_face;
- uint64_t m_nextSegmentNo;
- uint64_t m_lastSegmentNo;
- uint64_t m_excludeSegmentNo;
DataCallback m_onData;
FailureCallback m_onFailure;
- const Options m_options;
- std::vector<std::pair<shared_ptr<DataFetcher>, uint64_t>> m_segmentFetchers;
- bool m_hasFinalBlockId;
- /**
- * true if there's a critical error
- */
- bool m_hasError;
- /**
- * true if one or more segmentFetcher failed, if lastSegmentNo is not set this is usually not a
- * fatal error for the pipeline
- */
- bool m_hasFailure;
+ bool m_isStopping;
};
} // namespace chunks