util: use signals instead of callbacks in SegmentFetcher

Deprecate fetch() static functions in favor of start()

refs #4464

Change-Id: I8632790e6151ad019b19c89ceae1944d37871da2
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index 90cddaf..0d53aaf 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -20,7 +20,6 @@
  */
 
 #include "segment-fetcher.hpp"
-#include "../encoding/buffer-stream.hpp"
 #include "../name-component.hpp"
 #include "../lp/nack.hpp"
 #include "../lp/nack-header.hpp"
@@ -33,28 +32,34 @@
 
 const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
 
-SegmentFetcher::SegmentFetcher(Face& face,
-                               shared_ptr<security::v2::Validator> validator,
-                               const CompleteCallback& completeCallback,
-                               const ErrorCallback& errorCallback)
+SegmentFetcher::SegmentFetcher(Face& face, security::v2::Validator& validator)
   : m_face(face)
   , m_scheduler(m_face.getIoService())
   , m_validator(validator)
-  , m_completeCallback(completeCallback)
-  , m_errorCallback(errorCallback)
-  , m_buffer(make_shared<OBufferStream>())
 {
 }
 
 shared_ptr<SegmentFetcher>
+SegmentFetcher::start(Face& face,
+                      const Interest& baseInterest,
+                      security::v2::Validator& validator)
+{
+  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator));
+  fetcher->fetchFirstSegment(baseInterest, fetcher);
+  return fetcher;
+}
+
+shared_ptr<SegmentFetcher>
 SegmentFetcher::fetch(Face& face,
                       const Interest& baseInterest,
                       security::v2::Validator& validator,
                       const CompleteCallback& completeCallback,
                       const ErrorCallback& errorCallback)
 {
-  shared_ptr<security::v2::Validator> validatorPtr(&validator, [] (security::v2::Validator*) {});
-  return fetch(face, baseInterest, validatorPtr, completeCallback, errorCallback);
+  shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator);
+  fetcher->onComplete.connect(completeCallback);
+  fetcher->onError.connect(errorCallback);
+  return fetcher;
 }
 
 shared_ptr<SegmentFetcher>
@@ -64,11 +69,8 @@
                       const CompleteCallback& completeCallback,
                       const ErrorCallback& errorCallback)
 {
-  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, completeCallback,
-                                                        errorCallback));
-
-  fetcher->fetchFirstSegment(baseInterest, fetcher);
-
+  auto fetcher = fetch(face, baseInterest, *validator, completeCallback, errorCallback);
+  fetcher->onComplete.connect([validator] (ConstBufferPtr) {});
   return fetcher;
 }
 
@@ -83,7 +85,7 @@
   m_face.expressInterest(interest,
                          bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, true, self),
                          bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
-                         bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
 }
 
 void
@@ -99,7 +101,7 @@
   m_face.expressInterest(interest,
                          bind(&SegmentFetcher::afterSegmentReceivedCb, this, _1, _2, false, self),
                          bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2, 0, self),
-                         bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
 }
 
 void
@@ -108,10 +110,10 @@
                                        shared_ptr<SegmentFetcher> self)
 {
   afterSegmentReceived(data);
-  m_validator->validate(data,
-                        bind(&SegmentFetcher::afterValidationSuccess, this, _1,
-                             isSegmentZeroExpected, origInterest, self),
-                        bind(&SegmentFetcher::afterValidationFailure, this, _1, _2));
+  m_validator.validate(data,
+                       bind(&SegmentFetcher::afterValidationSuccess, this, _1,
+                            isSegmentZeroExpected, origInterest, self),
+                       bind(&SegmentFetcher::afterValidationFailure, this, _1, _2));
 
 }
 
@@ -128,28 +130,28 @@
       fetchNextSegment(origInterest, data.getName(), 0, self);
     }
     else {
-      m_buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
-                      data.getContent().value_size());
+      m_buffer.write(reinterpret_cast<const char*>(data.getContent().value()),
+                     data.getContent().value_size());
       afterSegmentValidated(data);
       const auto& finalBlockId = data.getFinalBlock();
       if (!finalBlockId || (*finalBlockId > currentSegment)) {
         fetchNextSegment(origInterest, data.getName(), currentSegment.toSegment() + 1, self);
       }
       else {
-        return m_completeCallback(m_buffer->buf());
+        onComplete(m_buffer.buf());
       }
     }
   }
   else {
-    m_errorCallback(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
+    onError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
   }
 }
 
 void
 SegmentFetcher::afterValidationFailure(const Data& data, const security::v2::ValidationError& error)
 {
-  return m_errorCallback(SEGMENT_VALIDATION_FAIL, "Segment validation fail " +
-                         boost::lexical_cast<std::string>(error));
+  onError(SEGMENT_VALIDATION_FAIL, "Segment validation fail " +
+                                   boost::lexical_cast<std::string>(error));
 }
 
 
@@ -158,7 +160,7 @@
                                     uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
 {
   if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
-    m_errorCallback(NACK_ERROR, "Nack Error");
+    onError(NACK_ERROR, "Nack Error");
   }
   else {
     switch (nack.getReason()) {
@@ -172,7 +174,7 @@
                                        origInterest, reExpressCount, self));
         break;
       default:
-        m_errorCallback(NACK_ERROR, "Nack Error");
+        onError(NACK_ERROR, "Nack Error");
         break;
     }
   }
@@ -196,7 +198,7 @@
                               isSegmentZeroExpected, self),
                          bind(&SegmentFetcher::afterNackReceivedCb, this, _1, _2,
                               ++reExpressCount, self),
-                         bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+                         bind([this] { onError(INTEREST_TIMEOUT, "Timeout"); }));
 }
 
 } // namespace util
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index cec29b7..13fbd3d 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -24,14 +24,12 @@
 
 #include "../common.hpp"
 #include "../face.hpp"
+#include "../encoding/buffer-stream.hpp"
 #include "../security/v2/validator.hpp"
 #include "scheduler.hpp"
 #include "signal.hpp"
 
 namespace ndn {
-
-class OBufferStream;
-
 namespace util {
 
 /**
@@ -107,7 +105,7 @@
    */
   static const uint32_t MAX_INTEREST_REEXPRESS;
 
-  typedef function<void (const ConstBufferPtr& data)> CompleteCallback;
+  typedef function<void (ConstBufferPtr data)> CompleteCallback;
   typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
 
   /**
@@ -123,6 +121,32 @@
   /**
    * @brief Initiates segment fetching
    *
+   * @param face         Reference to the Face that should be used to fetch data
+   * @param baseInterest An Interest for the initial segment of requested data.
+   *                     This interest may include a custom InterestLifetime and selectors that will
+   *                     propagate to all subsequent Interests. The only exception is that the
+   *                     initial Interest will be forced to include the "ChildSelector=1" and
+   *                     "MustBeFresh=true" selectors, which will be turned off in subsequent
+   *                     Interests.
+   * @param validator    Reference to the Validator that should be used to validate data. Caller
+   *                     must ensure validator is valid until either onComplete or onError has been
+   *                     signaled.
+   *
+   * @return A shared_ptr to the constructed SegmentFetcher
+   *
+   * Transfer completion, failure, and progress are indicated via signals.
+   */
+  static
+  shared_ptr<SegmentFetcher>
+  start(Face& face,
+        const Interest& baseInterest,
+        security::v2::Validator& validator);
+
+  /**
+   * @brief Initiates segment fetching
+   *
+   * @deprecated Use @c start
+   *
    * @param face          Reference to the Face that should be used to fetch data
    * @param baseInterest  An Interest for the initial segment of requested data.
    *                      This interest may include custom InterestLifetime and selectors that
@@ -138,6 +162,7 @@
    * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
    * @return A shared_ptr to the constructed SegmentFetcher
    */
+  NDN_CXX_DEPRECATED_MSG("Use SegmentFetcher::start instead")
   static
   shared_ptr<SegmentFetcher>
   fetch(Face& face,
@@ -149,6 +174,8 @@
   /**
    * @brief Initiate segment fetching
    *
+   * @deprecated Use @c start
+   *
    * @param face          Reference to the Face that should be used to fetch data
    * @param baseInterest  An Interest for the initial segment of requested data.
    *                      This interest may include custom InterestLifetime and selectors that
@@ -162,6 +189,7 @@
    * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
    * @return A shared_ptr to the constructed SegmentFetcher
    */
+  NDN_CXX_DEPRECATED_MSG("Use SegmentFetcher::start instead")
   static
   shared_ptr<SegmentFetcher>
   fetch(Face& face,
@@ -171,10 +199,7 @@
         const ErrorCallback& errorCallback);
 
 private:
-  SegmentFetcher(Face& face,
-                 shared_ptr<security::v2::Validator> validator,
-                 const CompleteCallback& completeCallback,
-                 const ErrorCallback& errorCallback);
+  SegmentFetcher(Face& face, security::v2::Validator& validator);
 
   void
   fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
@@ -206,6 +231,18 @@
 
 public:
   /**
+   * @brief Emits upon successful retrieval of the complete data
+   */
+  Signal<SegmentFetcher, ConstBufferPtr> onComplete;
+
+  /**
+   * @brief Emits when the retrieval could not be completed due to an error
+   *
+   * Handler(s) are provided with an error code and a string error message.
+   */
+  Signal<SegmentFetcher, uint32_t, std::string> onError;
+
+  /**
    * @brief Emits whenever a data segment received
    */
   Signal<SegmentFetcher, Data> afterSegmentReceived;
@@ -218,11 +255,9 @@
 private:
   Face& m_face;
   Scheduler m_scheduler;
-  shared_ptr<security::v2::Validator> m_validator;
-  CompleteCallback m_completeCallback;
-  ErrorCallback m_errorCallback;
+  security::v2::Validator& m_validator;
 
-  shared_ptr<OBufferStream> m_buffer;
+  OBufferStream m_buffer;
 };
 
 } // namespace util