util: Asynchronous data validation in SegmentFetcher

Change-Id: I407d3c1de49c86a6b4a0174baae6ed2e8be5c554
Refs: #2734
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index e7703c5..a9c0418 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -20,18 +20,23 @@
  */
 
 #include "segment-fetcher.hpp"
-
 #include "../encoding/buffer-stream.hpp"
+#include "../name-component.hpp"
+#include "../lp/nack.hpp"
+#include "../lp/nack-header.hpp"
 
 namespace ndn {
 namespace util {
 
+const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+
 SegmentFetcher::SegmentFetcher(Face& face,
-                               const VerifySegment& verifySegment,
+                               shared_ptr<Validator> validator,
                                const CompleteCallback& completeCallback,
                                const ErrorCallback& errorCallback)
   : m_face(face)
-  , m_verifySegment(verifySegment)
+  , m_scheduler(m_face.getIoService())
+  , m_validator(validator)
   , m_completeCallback(completeCallback)
   , m_errorCallback(errorCallback)
   , m_buffer(make_shared<OBufferStream>())
@@ -41,79 +46,143 @@
 void
 SegmentFetcher::fetch(Face& face,
                       const Interest& baseInterest,
-                      const VerifySegment& verifySegment,
+                      Validator& validator,
                       const CompleteCallback& completeCallback,
                       const ErrorCallback& errorCallback)
 {
-  shared_ptr<SegmentFetcher> fetcher =
-    shared_ptr<SegmentFetcher>(new SegmentFetcher(face, verifySegment,
-                                                  completeCallback, errorCallback));
+  shared_ptr<Validator> sharedValidator = shared_ptr<Validator>(&validator, [] (Validator*) {});
+
+  fetch(face, baseInterest, sharedValidator, completeCallback, errorCallback);
+}
+
+void
+SegmentFetcher::fetch(Face& face,
+                      const Interest& baseInterest,
+                      shared_ptr<Validator> validator,
+                      const CompleteCallback& completeCallback,
+                      const ErrorCallback& errorCallback)
+{
+  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, completeCallback,
+                                                        errorCallback));
 
   fetcher->fetchFirstSegment(baseInterest, fetcher);
 }
 
 void
 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
-                                  const shared_ptr<SegmentFetcher>& self)
+                                  shared_ptr<SegmentFetcher> self)
 {
   Interest interest(baseInterest);
   interest.setChildSelector(1);
   interest.setMustBeFresh(true);
 
   m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
                          bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
 }
 
 void
 SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
                                  uint64_t segmentNo,
-                                 const shared_ptr<SegmentFetcher>& self)
+                                 shared_ptr<SegmentFetcher> self)
 {
-  Interest interest(origInterest); // to preserve any special selectors
+  Interest interest(origInterest); // to preserve any selectors
   interest.refreshNonce();
   interest.setChildSelector(0);
   interest.setMustBeFresh(false);
   interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
   m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, false, self),
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, false, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
                          bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
 }
 
 void
-SegmentFetcher::onSegmentReceived(const Interest& origInterest,
-                                  const Data& data, bool isSegmentZeroExpected,
-                                  const shared_ptr<SegmentFetcher>& self)
+SegmentFetcher::afterSegmentReceived(const Interest& origInterest,
+                                     const Data& data, bool isSegmentZeroExpected,
+                                     shared_ptr<SegmentFetcher> self)
 {
-  if (!m_verifySegment(data)) {
-    return m_errorCallback(SEGMENT_VERIFICATION_FAIL, "Segment validation fail");
-  }
+  m_validator->validate(data,
+                        bind(&SegmentFetcher::afterValidationSuccess, this, _1,
+                             isSegmentZeroExpected, origInterest, self),
+                        bind(&SegmentFetcher::afterValidationFailure, this, _1));
 
-  try {
-    uint64_t currentSegment = data.getName().get(-1).toSegment();
+}
 
-    if (isSegmentZeroExpected && currentSegment != 0) {
-      fetchNextSegment(origInterest, data.getName(), 0, self);
+void
+SegmentFetcher::afterValidationSuccess(const shared_ptr<const Data> data,
+                                       bool isSegmentZeroExpected,
+                                       const Interest& origInterest,
+                                       shared_ptr<SegmentFetcher> self)
+{
+  name::Component currentSegment = data->getName().get(-1);
+
+  if (currentSegment.isSegment()) {
+    if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
+      fetchNextSegment(origInterest, data->getName(), 0, self);
     }
     else {
-      m_buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
-                      data.getContent().value_size());
+      m_buffer->write(reinterpret_cast<const char*>(data->getContent().value()),
+                      data->getContent().value_size());
 
-      const name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
-      if (finalBlockId.empty() ||
-          finalBlockId.toSegment() > currentSegment)
-        {
-          fetchNextSegment(origInterest, data.getName(), currentSegment + 1, self);
-        }
+      const name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
+      if (finalBlockId.empty() || (finalBlockId > currentSegment)) {
+        fetchNextSegment(origInterest, data->getName(), currentSegment.toSegment() + 1, self);
+      }
       else {
         return m_completeCallback(m_buffer->buf());
       }
     }
   }
-  catch (const tlv::Error& e) {
-    m_errorCallback(DATA_HAS_NO_SEGMENT, std::string("Error while decoding segment: ") + e.what());
+  else {
+    m_errorCallback(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
   }
 }
 
-} // util
-} // ndn
+void
+SegmentFetcher::afterValidationFailure(const shared_ptr<const Data> data)
+{
+  return m_errorCallback(SEGMENT_VALIDATION_FAIL, "Segment validation fail");
+}
+
+
+void
+SegmentFetcher::afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+                                  uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
+{
+  if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
+    m_errorCallback(NACK_ERROR, "Nack Error");
+  }
+  else {
+    switch (nack.getReason()) {
+      case lp::NackReason::DUPLICATE:
+        reExpressInterest(origInterest, reExpressCount, self);
+        break;
+      case lp::NackReason::CONGESTION:
+        m_scheduler.scheduleEvent(time::milliseconds(static_cast<uint32_t>(pow(2, reExpressCount + 1))),
+                                  bind(&SegmentFetcher::reExpressInterest, this,
+                                       origInterest, reExpressCount, self));
+        break;
+      default:
+        m_errorCallback(NACK_ERROR, "Nack Error");
+        break;
+    }
+  }
+}
+
+void
+SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
+                                  shared_ptr<SegmentFetcher> self)
+{
+  interest.refreshNonce();
+  BOOST_ASSERT(interest.hasNonce());
+
+  m_face.expressInterest(interest,
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, ++reExpressCount, self),
+                         bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+}
+
+} // namespace util
+} // namespace ndn
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index fb8f6d7..0c2e908 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -22,8 +22,10 @@
 #ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
 #define NDN_UTIL_SEGMENT_FETCHER_HPP
 
+#include "scheduler.hpp"
 #include "../common.hpp"
 #include "../face.hpp"
+#include "../security/validator.hpp"
 
 namespace ndn {
 
@@ -32,19 +34,6 @@
 namespace util {
 
 /**
- * @brief Functor to skip validation of individual packets by SegmentFetcher
- */
-class DontVerifySegment
-{
-public:
-  bool
-  operator()(const Data& data) const
-  {
-    return true;
-  }
-};
-
-/**
  * @brief Utility class to fetch latest version of the segmented data
  *
  * SegmentFetcher assumes that the data is named `/<prefix>/<version>/<segment>`,
@@ -82,38 +71,42 @@
  * - `INTEREST_TIMEOUT`: if any of the Interests times out
  * - `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have segment
  *   as a last component of the name (not counting implicit digest)
- * - `SEGMENT_VERIFICATION_FAIL`: if any retrieved segment fails user-provided validation
+ * - `SEGMENT_VALIDATION_FAIL`: if any retrieved segment fails user-provided validation
  *
- * In order to validate individual segments, an VerifySegment callback needs to be specified.
- * If the callback returns false, fetching process is aborted with SEGMENT_VERIFICATION_FAIL.
- * If data validation is not required, provided DontVerifySegment() functor can be used.
+ * In order to validate individual segments, a Validator instance needs to be specified.
+ * If the segment validation is successful, afterValidationSuccess callback is fired, otherwise
+ * afterValidationFailure callback.
  *
  * Examples:
  *
  *     void
- *     onComplete(const ConstBufferPtr& data)
+ *     afterFetchComplete(const ConstBufferPtr& data)
  *     {
  *       ...
  *     }
  *
  *     void
- *     onError(uint32_t errorCode, const std::string& errorMsg)
+ *     afterFetchError(uint32_t errorCode, const std::string& errorMsg)
  *     {
  *       ...
  *     }
  *
  *     ...
  *     SegmentFetcher::fetch(face, Interest("/data/prefix", time::seconds(1000)),
- *                           DontVerifySegment(),
- *                           bind(&onComplete, this, _1),
- *                           bind(&onError, this, _1, _2));
+ *                           validator,
+ *                           bind(&afterFetchComplete, this, _1),
+ *                           bind(&afterFetchError, this, _1, _2));
  *
  */
 class SegmentFetcher : noncopyable
 {
 public:
+  /**
+   * @brief Maximum number of times an interest will be reexpressed incase of NackCallback
+   */
+  static const uint32_t MAX_INTEREST_REEXPRESS;
+
   typedef function<void (const ConstBufferPtr& data)> CompleteCallback;
-  typedef function<bool (const Data& data)> VerifySegment;
   typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
 
   /**
@@ -122,7 +115,8 @@
   enum ErrorCode {
     INTEREST_TIMEOUT = 1,
     DATA_HAS_NO_SEGMENT = 2,
-    SEGMENT_VERIFICATION_FAIL = 3
+    SEGMENT_VALIDATION_FAIL = 3,
+    NACK_ERROR = 4
   };
 
   /**
@@ -132,12 +126,36 @@
    * @param baseInterest  An Interest for the initial segment of requested data.
    *                      This interest may include custom InterestLifetime and selectors that
    *                      will propagate to all subsequent Interests.  The only exception is that
+   *                      the initial Interest will be forced to include "ChildSelector=rightmost" and
+   *                      "MustBeFresh=true" selectors, which will be turned off in subsequent
+   *                      Interests.
+   * @param validator     Reference to the Validator that should be used to validate data. Caller
+   *                      must ensure validator is valid until either completeCallback or errorCallback
+   *                      is invoked.
+   *
+   * @param completeCallback    Callback to be fired when all segments are fetched
+   * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
+   */
+  static
+  void
+  fetch(Face& face,
+        const Interest& baseInterest,
+        Validator& validator,
+        const CompleteCallback& completeCallback,
+        const ErrorCallback& errorCallback);
+
+  /**
+   * @brief Initiate segment fetching
+   *
+   * @param face          Reference to the Face that should be used to fetch data
+   * @param baseInterest  An Interest for the initial segment of requested data.
+   *                      This interest may include custom InterestLifetime and selectors that
+   *                      will propagate to all subsequent Interests.  The only exception is that
    *                      the initial Interest will be forced to include "ChildSelector=1" and
    *                      "MustBeFresh=true" selectors, which will be turned off in subsequent
    *                      Interests.
-   * @param verifySegment Functor to be called when Data segment is received.  If
-   *                      functor return false, fetching will be aborted with
-   *                      SEGMENT_VERIFICATION_FAIL error
+   * @param validator     A shared_ptr to the Validator that should be used to validate data.
+   *
    * @param completeCallback    Callback to be fired when all segments are fetched
    * @param errorCallback       Callback to be fired when an error occurs (@see Errors)
    */
@@ -145,38 +163,55 @@
   void
   fetch(Face& face,
         const Interest& baseInterest,
-        const VerifySegment& verifySegment,
+        shared_ptr<Validator> validator,
         const CompleteCallback& completeCallback,
         const ErrorCallback& errorCallback);
 
 private:
   SegmentFetcher(Face& face,
-                 const VerifySegment& verifySegment,
+                 shared_ptr<Validator> validator,
                  const CompleteCallback& completeCallback,
                  const ErrorCallback& errorCallback);
 
   void
-  fetchFirstSegment(const Interest& baseInterest, const shared_ptr<SegmentFetcher>& self);
+  fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
 
   void
   fetchNextSegment(const Interest& origInterest, const Name& dataName, uint64_t segmentNo,
-                   const shared_ptr<SegmentFetcher>& self);
+                   shared_ptr<SegmentFetcher> self);
 
   void
-  onSegmentReceived(const Interest& origInterest,
-                    const Data& data, bool isSegmentZeroExpected,
-                    const shared_ptr<SegmentFetcher>& self);
+  afterSegmentReceived(const Interest& origInterest,
+                       const Data& data, bool isSegmentZeroExpected,
+                       shared_ptr<SegmentFetcher> self);
+  void
+  afterValidationSuccess(const shared_ptr<const Data> data,
+                         bool isSegmentZeroExpected,
+                         const Interest& origInterest,
+                         shared_ptr<SegmentFetcher> self);
+
+  void
+  afterValidationFailure(const shared_ptr<const Data> data);
+
+  void
+  afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+                    uint32_t reExpressCount, shared_ptr<SegmentFetcher> self);
+
+  void
+  reExpressInterest(Interest interest, uint32_t reExpressCount,
+                    shared_ptr<SegmentFetcher> self);
 
 private:
   Face& m_face;
-  VerifySegment m_verifySegment;
+  Scheduler m_scheduler;
+  shared_ptr<Validator> m_validator;
   CompleteCallback m_completeCallback;
   ErrorCallback m_errorCallback;
 
   shared_ptr<OBufferStream> m_buffer;
 };
 
-} // util
-} // ndn
+} // namespace util
+} // namespace ndn
 
 #endif // NDN_UTIL_SEGMENT_FETCHER_HPP
diff --git a/tests/unit-tests/util/segment-fetcher.t.cpp b/tests/unit-tests/util/segment-fetcher.t.cpp
index b0884eb..e53cc0f 100644
--- a/tests/unit-tests/util/segment-fetcher.t.cpp
+++ b/tests/unit-tests/util/segment-fetcher.t.cpp
@@ -20,11 +20,17 @@
  */
 
 #include "util/segment-fetcher.hpp"
+#include "security/validator-null.hpp"
+#include "security/validator.hpp"
+#include "data.hpp"
+#include "encoding/block.hpp"
 
 #include "boost-test.hpp"
 #include "util/dummy-client-face.hpp"
 #include "security/key-chain.hpp"
+#include "lp/nack-header.hpp"
 #include "../unit-test-time-fixture.hpp"
+#include "../make-interest-data.hpp"
 
 namespace ndn {
 namespace util {
@@ -32,6 +38,30 @@
 
 BOOST_AUTO_TEST_SUITE(UtilSegmentFetcher)
 
+class ValidatorFailed : public Validator
+{
+protected:
+  virtual void
+  checkPolicy(const Data& data,
+              int nSteps,
+              const OnDataValidated& onValidated,
+              const OnDataValidationFailed& onValidationFailed,
+              std::vector<shared_ptr<ValidationRequest>>& nextSteps)
+  {
+    onValidationFailed(data.shared_from_this(), "Data validation failed.");
+  }
+
+  virtual void
+  checkPolicy(const Interest& interest,
+              int nSteps,
+              const OnInterestValidated& onValidated,
+              const OnInterestValidationFailed& onValidationFailed,
+              std::vector<shared_ptr<ValidationRequest>>& nextSteps)
+  {
+    onValidationFailed(interest.shared_from_this(), "Interest validation failed.");
+  }
+};
+
 class Fixture : public ndn::tests::UnitTestTimeFixture
 {
 public:
@@ -44,16 +74,17 @@
   }
 
   shared_ptr<Data>
-  makeData(const Name& baseName, uint64_t segment, bool isFinal)
+  makeDataSegment(const Name& baseName, uint64_t segment, bool isFinal)
   {
     const uint8_t buffer[] = "Hello, world!";
 
     shared_ptr<Data> data = make_shared<Data>(Name(baseName).appendSegment(segment));
     data->setContent(buffer, sizeof(buffer));
 
-    if (isFinal)
+    if (isFinal) {
       data->setFinalBlockId(data->getName()[-1]);
-    keyChain.sign(*data);
+    }
+    data = signData(data);
 
     return data;
   }
@@ -66,12 +97,21 @@
   }
 
   void
-  onData(const ConstBufferPtr& data)
+  onComplete(const ConstBufferPtr& data)
   {
     ++nDatas;
     dataSize = data->size();
+    dataString = std::string(reinterpret_cast<const char*>(data->get()));
   }
 
+  void
+  nackLastInterest(lp::NackReason nackReason)
+  {
+    const Interest& lastInterest = face->sentInterests.back();
+    lp::Nack nack = makeNack(lastInterest, nackReason);
+    face->receive(nack);
+    advanceClocks(time::milliseconds(1), 10);
+  }
 
 public:
   shared_ptr<DummyClientFace> face;
@@ -81,13 +121,15 @@
   uint32_t lastError;
   uint32_t nDatas;
   size_t dataSize;
+  std::string dataString;
 };
 
 BOOST_FIXTURE_TEST_CASE(Timeout, Fixture)
 {
+  ValidatorNull nullValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::milliseconds(100)),
-                        DontVerifySegment(),
-                        bind(&Fixture::onData, this, _1),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 99);
@@ -113,14 +155,15 @@
 
 BOOST_FIXTURE_TEST_CASE(Basic, Fixture)
 {
+  ValidatorNull nullValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
-                        DontVerifySegment(),
-                        bind(&Fixture::onData, this, _1),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 10);
 
-  face->receive(*makeData("/hello/world/version0", 0, true));
+  face->receive(*makeDataSegment("/hello/world/version0", 0, true));
   advanceClocks(time::milliseconds(1), 10);
 
   BOOST_CHECK_EQUAL(nErrors, 0);
@@ -128,6 +171,11 @@
 
   BOOST_CHECK_EQUAL(dataSize, 14);
 
+  const uint8_t buffer[] = "Hello, world!";
+  std::string bufferString = std::string(reinterpret_cast<const char*>(buffer));
+
+  BOOST_CHECK_EQUAL(dataString, bufferString);
+
   BOOST_REQUIRE_EQUAL(face->sentInterests.size(), 1);
   BOOST_CHECK_EQUAL(face->sentDatas.size(), 0);
 
@@ -139,19 +187,19 @@
 
 BOOST_FIXTURE_TEST_CASE(NoSegmentInData, Fixture)
 {
-
+  ValidatorNull nullValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
-                        DontVerifySegment(),
-                        bind(&Fixture::onData, this, _1),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 10);
 
   const uint8_t buffer[] = "Hello, world!";
 
-  shared_ptr<Data> data = make_shared<Data>("/hello/world/version0/no-segment");
+  shared_ptr<Data> data = makeData("/hello/world/version0/no-segment");
+
   data->setContent(buffer, sizeof(buffer));
-  keyChain.sign(*data);
 
   face->receive(*data);
   advanceClocks(time::milliseconds(1), 10);
@@ -161,45 +209,39 @@
   BOOST_CHECK_EQUAL(nDatas, 0);
 }
 
-bool
-failValidation(const Data& data)
-{
-  return false;
-}
-
 BOOST_FIXTURE_TEST_CASE(SegmentValidationFailure, Fixture)
 {
-
+  ValidatorFailed failedValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
-                        &failValidation,
-                        bind(&Fixture::onData, this, _1),
+                        failedValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 0, true));
+  face->receive(*makeDataSegment("/hello/world/version0", 0, true));
   advanceClocks(time::milliseconds(1), 10);
 
   BOOST_CHECK_EQUAL(nErrors, 1);
-  BOOST_CHECK_EQUAL(lastError, static_cast<uint32_t>(SegmentFetcher::SEGMENT_VERIFICATION_FAIL));
+  BOOST_CHECK_EQUAL(lastError, static_cast<uint32_t>(SegmentFetcher::SEGMENT_VALIDATION_FAIL));
   BOOST_CHECK_EQUAL(nDatas, 0);
 }
 
-
 BOOST_FIXTURE_TEST_CASE(Triple, Fixture)
 {
+  ValidatorNull nullValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
-                        DontVerifySegment(),
-                        bind(&Fixture::onData, this, _1),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 0, false));
+  face->receive(*makeDataSegment("/hello/world/version0", 0, false));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 1, false));
+  face->receive(*makeDataSegment("/hello/world/version0", 1, false));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 2, true));
+  face->receive(*makeDataSegment("/hello/world/version0", 2, true));
 
   advanceClocks(time::milliseconds(1), 10);
 
@@ -235,22 +277,23 @@
 
 BOOST_FIXTURE_TEST_CASE(TripleWithInitialSegmentFetching, Fixture)
 {
+  ValidatorNull nullValidator;
   SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
-                        DontVerifySegment(),
-                        bind(&Fixture::onData, this, _1),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
                         bind(&Fixture::onError, this, _1));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 1, false));
+  face->receive(*makeDataSegment("/hello/world/version0", 1, false));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 0, false));
+  face->receive(*makeDataSegment("/hello/world/version0", 0, false));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 1, false));
+  face->receive(*makeDataSegment("/hello/world/version0", 1, false));
 
   advanceClocks(time::milliseconds(1), 10);
-  face->receive(*makeData("/hello/world/version0", 2, true));
+  face->receive(*makeDataSegment("/hello/world/version0", 2, true));
 
   advanceClocks(time::milliseconds(1), 10);
 
@@ -291,6 +334,68 @@
   }
 }
 
+BOOST_FIXTURE_TEST_CASE(MultipleSegmentFetching, Fixture)
+{
+  ValidatorNull nullValidator;
+  SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
+                        nullValidator,
+                        bind(&Fixture::onComplete, this, _1),
+                        bind(&Fixture::onError, this, _1));
+
+  advanceClocks(time::milliseconds(1), 10);
+
+  for (uint64_t i = 0; i < 400; i++) {
+    advanceClocks(time::milliseconds(1), 10);
+    face->receive(*makeDataSegment("/hello/world/version0", i, false));
+  }
+  advanceClocks(time::milliseconds(1), 10);
+  face->receive(*makeDataSegment("/hello/world/version0", 400, true));
+
+  advanceClocks(time::milliseconds(1), 10);
+
+  BOOST_CHECK_EQUAL(nErrors, 0);
+  BOOST_CHECK_EQUAL(nDatas, 1);
+}
+
+BOOST_FIXTURE_TEST_CASE(SegmentFetcherDuplicateNack, Fixture)
+{
+  SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
+                        make_shared<ValidatorNull>(),
+                        bind(&Fixture::onComplete, this, _1),
+                        bind(&Fixture::onError, this, _1));
+  advanceClocks(time::milliseconds(1), 10);
+
+  // receive nack for the original interest
+  nackLastInterest(lp::NackReason::DUPLICATE);
+
+  // receive nack due to Duplication for the reexpressed interests
+  for (uint32_t i = 1; i <= SegmentFetcher::MAX_INTEREST_REEXPRESS; ++i) {
+    nackLastInterest(lp::NackReason::DUPLICATE);
+  }
+
+  BOOST_CHECK_EQUAL(face->sentInterests.size(), (SegmentFetcher::MAX_INTEREST_REEXPRESS + 1));
+  BOOST_CHECK_EQUAL(lastError, static_cast<uint32_t>(SegmentFetcher::NACK_ERROR));
+}
+
+BOOST_FIXTURE_TEST_CASE(SegmentFetcherCongestionNack, Fixture)
+{
+  SegmentFetcher::fetch(*face, Interest("/hello/world", time::seconds(1000)),
+                        make_shared<ValidatorNull>(),
+                        bind(&Fixture::onComplete, this, _1),
+                        bind(&Fixture::onError, this, _1));
+  advanceClocks(time::milliseconds(1), 10);
+
+  // receive nack for the original interest
+  nackLastInterest(lp::NackReason::CONGESTION);
+
+  // receive nack due to Congestion for the reexpressed interests
+  for (uint32_t i = 1; i <= SegmentFetcher::MAX_INTEREST_REEXPRESS; ++i) {
+    nackLastInterest(lp::NackReason::CONGESTION);
+  }
+
+  BOOST_CHECK_EQUAL(face->sentInterests.size(), (SegmentFetcher::MAX_INTEREST_REEXPRESS + 1));
+  BOOST_CHECK_EQUAL(lastError, static_cast<uint32_t>(SegmentFetcher::NACK_ERROR));
+}
 
 BOOST_AUTO_TEST_SUITE_END()