util: Asynchronous data validation in SegmentFetcher

Change-Id: I407d3c1de49c86a6b4a0174baae6ed2e8be5c554
Refs: #2734
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index e7703c5..a9c0418 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -20,18 +20,23 @@
  */
 
 #include "segment-fetcher.hpp"
-
 #include "../encoding/buffer-stream.hpp"
+#include "../name-component.hpp"
+#include "../lp/nack.hpp"
+#include "../lp/nack-header.hpp"
 
 namespace ndn {
 namespace util {
 
+const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+
 SegmentFetcher::SegmentFetcher(Face& face,
-                               const VerifySegment& verifySegment,
+                               shared_ptr<Validator> validator,
                                const CompleteCallback& completeCallback,
                                const ErrorCallback& errorCallback)
   : m_face(face)
-  , m_verifySegment(verifySegment)
+  , m_scheduler(m_face.getIoService())
+  , m_validator(validator)
   , m_completeCallback(completeCallback)
   , m_errorCallback(errorCallback)
   , m_buffer(make_shared<OBufferStream>())
@@ -41,79 +46,143 @@
 void
 SegmentFetcher::fetch(Face& face,
                       const Interest& baseInterest,
-                      const VerifySegment& verifySegment,
+                      Validator& validator,
                       const CompleteCallback& completeCallback,
                       const ErrorCallback& errorCallback)
 {
-  shared_ptr<SegmentFetcher> fetcher =
-    shared_ptr<SegmentFetcher>(new SegmentFetcher(face, verifySegment,
-                                                  completeCallback, errorCallback));
+  shared_ptr<Validator> sharedValidator = shared_ptr<Validator>(&validator, [] (Validator*) {});
+
+  fetch(face, baseInterest, sharedValidator, completeCallback, errorCallback);
+}
+
+void
+SegmentFetcher::fetch(Face& face,
+                      const Interest& baseInterest,
+                      shared_ptr<Validator> validator,
+                      const CompleteCallback& completeCallback,
+                      const ErrorCallback& errorCallback)
+{
+  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, completeCallback,
+                                                        errorCallback));
 
   fetcher->fetchFirstSegment(baseInterest, fetcher);
 }
 
 void
 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
-                                  const shared_ptr<SegmentFetcher>& self)
+                                  shared_ptr<SegmentFetcher> self)
 {
   Interest interest(baseInterest);
   interest.setChildSelector(1);
   interest.setMustBeFresh(true);
 
   m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
                          bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
 }
 
 void
 SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
                                  uint64_t segmentNo,
-                                 const shared_ptr<SegmentFetcher>& self)
+                                 shared_ptr<SegmentFetcher> self)
 {
-  Interest interest(origInterest); // to preserve any special selectors
+  Interest interest(origInterest); // to preserve any selectors
   interest.refreshNonce();
   interest.setChildSelector(0);
   interest.setMustBeFresh(false);
   interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
   m_face.expressInterest(interest,
-                         bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, false, self),
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, false, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
                          bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
 }
 
 void
-SegmentFetcher::onSegmentReceived(const Interest& origInterest,
-                                  const Data& data, bool isSegmentZeroExpected,
-                                  const shared_ptr<SegmentFetcher>& self)
+SegmentFetcher::afterSegmentReceived(const Interest& origInterest,
+                                     const Data& data, bool isSegmentZeroExpected,
+                                     shared_ptr<SegmentFetcher> self)
 {
-  if (!m_verifySegment(data)) {
-    return m_errorCallback(SEGMENT_VERIFICATION_FAIL, "Segment validation fail");
-  }
+  m_validator->validate(data,
+                        bind(&SegmentFetcher::afterValidationSuccess, this, _1,
+                             isSegmentZeroExpected, origInterest, self),
+                        bind(&SegmentFetcher::afterValidationFailure, this, _1));
 
-  try {
-    uint64_t currentSegment = data.getName().get(-1).toSegment();
+}
 
-    if (isSegmentZeroExpected && currentSegment != 0) {
-      fetchNextSegment(origInterest, data.getName(), 0, self);
+void
+SegmentFetcher::afterValidationSuccess(const shared_ptr<const Data> data,
+                                       bool isSegmentZeroExpected,
+                                       const Interest& origInterest,
+                                       shared_ptr<SegmentFetcher> self)
+{
+  name::Component currentSegment = data->getName().get(-1);
+
+  if (currentSegment.isSegment()) {
+    if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
+      fetchNextSegment(origInterest, data->getName(), 0, self);
     }
     else {
-      m_buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
-                      data.getContent().value_size());
+      m_buffer->write(reinterpret_cast<const char*>(data->getContent().value()),
+                      data->getContent().value_size());
 
-      const name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
-      if (finalBlockId.empty() ||
-          finalBlockId.toSegment() > currentSegment)
-        {
-          fetchNextSegment(origInterest, data.getName(), currentSegment + 1, self);
-        }
+      const name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
+      if (finalBlockId.empty() || (finalBlockId > currentSegment)) {
+        fetchNextSegment(origInterest, data->getName(), currentSegment.toSegment() + 1, self);
+      }
       else {
         return m_completeCallback(m_buffer->buf());
       }
     }
   }
-  catch (const tlv::Error& e) {
-    m_errorCallback(DATA_HAS_NO_SEGMENT, std::string("Error while decoding segment: ") + e.what());
+  else {
+    m_errorCallback(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
   }
 }
 
-} // util
-} // ndn
+void
+SegmentFetcher::afterValidationFailure(const shared_ptr<const Data> data)
+{
+  return m_errorCallback(SEGMENT_VALIDATION_FAIL, "Segment validation fail");
+}
+
+
+void
+SegmentFetcher::afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+                                  uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
+{
+  if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
+    m_errorCallback(NACK_ERROR, "Nack Error");
+  }
+  else {
+    switch (nack.getReason()) {
+      case lp::NackReason::DUPLICATE:
+        reExpressInterest(origInterest, reExpressCount, self);
+        break;
+      case lp::NackReason::CONGESTION:
+        m_scheduler.scheduleEvent(time::milliseconds(static_cast<uint32_t>(pow(2, reExpressCount + 1))),
+                                  bind(&SegmentFetcher::reExpressInterest, this,
+                                       origInterest, reExpressCount, self));
+        break;
+      default:
+        m_errorCallback(NACK_ERROR, "Nack Error");
+        break;
+    }
+  }
+}
+
+void
+SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
+                                  shared_ptr<SegmentFetcher> self)
+{
+  interest.refreshNonce();
+  BOOST_ASSERT(interest.hasNonce());
+
+  m_face.expressInterest(interest,
+                         bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+                         bind(&SegmentFetcher::afterNackReceived, this, _1, _2, ++reExpressCount, self),
+                         bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+}
+
+} // namespace util
+} // namespace ndn