util: Asynchronous data validation in SegmentFetcher
Change-Id: I407d3c1de49c86a6b4a0174baae6ed2e8be5c554
Refs: #2734
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index e7703c5..a9c0418 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -20,18 +20,23 @@
*/
#include "segment-fetcher.hpp"
-
#include "../encoding/buffer-stream.hpp"
+#include "../name-component.hpp"
+#include "../lp/nack.hpp"
+#include "../lp/nack-header.hpp"
namespace ndn {
namespace util {
+const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+
SegmentFetcher::SegmentFetcher(Face& face,
- const VerifySegment& verifySegment,
+ shared_ptr<Validator> validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback)
: m_face(face)
- , m_verifySegment(verifySegment)
+ , m_scheduler(m_face.getIoService())
+ , m_validator(validator)
, m_completeCallback(completeCallback)
, m_errorCallback(errorCallback)
, m_buffer(make_shared<OBufferStream>())
@@ -41,79 +46,143 @@
void
SegmentFetcher::fetch(Face& face,
const Interest& baseInterest,
- const VerifySegment& verifySegment,
+ Validator& validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback)
{
- shared_ptr<SegmentFetcher> fetcher =
- shared_ptr<SegmentFetcher>(new SegmentFetcher(face, verifySegment,
- completeCallback, errorCallback));
+ shared_ptr<Validator> sharedValidator = shared_ptr<Validator>(&validator, [] (Validator*) {});
+
+ fetch(face, baseInterest, sharedValidator, completeCallback, errorCallback);
+}
+
+void
+SegmentFetcher::fetch(Face& face,
+ const Interest& baseInterest,
+ shared_ptr<Validator> validator,
+ const CompleteCallback& completeCallback,
+ const ErrorCallback& errorCallback)
+{
+ shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, completeCallback,
+ errorCallback));
fetcher->fetchFirstSegment(baseInterest, fetcher);
}
void
SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
- const shared_ptr<SegmentFetcher>& self)
+ shared_ptr<SegmentFetcher> self)
{
Interest interest(baseInterest);
interest.setChildSelector(1);
interest.setMustBeFresh(true);
m_face.expressInterest(interest,
- bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
}
void
SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
uint64_t segmentNo,
- const shared_ptr<SegmentFetcher>& self)
+ shared_ptr<SegmentFetcher> self)
{
- Interest interest(origInterest); // to preserve any special selectors
+ Interest interest(origInterest); // to preserve any selectors
interest.refreshNonce();
interest.setChildSelector(0);
interest.setMustBeFresh(false);
interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
m_face.expressInterest(interest,
- bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, false, self),
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, false, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
}
void
-SegmentFetcher::onSegmentReceived(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
- const shared_ptr<SegmentFetcher>& self)
+SegmentFetcher::afterSegmentReceived(const Interest& origInterest,
+ const Data& data, bool isSegmentZeroExpected,
+ shared_ptr<SegmentFetcher> self)
{
- if (!m_verifySegment(data)) {
- return m_errorCallback(SEGMENT_VERIFICATION_FAIL, "Segment validation fail");
- }
+ m_validator->validate(data,
+ bind(&SegmentFetcher::afterValidationSuccess, this, _1,
+ isSegmentZeroExpected, origInterest, self),
+ bind(&SegmentFetcher::afterValidationFailure, this, _1));
- try {
- uint64_t currentSegment = data.getName().get(-1).toSegment();
+}
- if (isSegmentZeroExpected && currentSegment != 0) {
- fetchNextSegment(origInterest, data.getName(), 0, self);
+void
+SegmentFetcher::afterValidationSuccess(const shared_ptr<const Data> data,
+ bool isSegmentZeroExpected,
+ const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self)
+{
+ name::Component currentSegment = data->getName().get(-1);
+
+ if (currentSegment.isSegment()) {
+ if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
+ fetchNextSegment(origInterest, data->getName(), 0, self);
}
else {
- m_buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
- data.getContent().value_size());
+ m_buffer->write(reinterpret_cast<const char*>(data->getContent().value()),
+ data->getContent().value_size());
- const name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
- if (finalBlockId.empty() ||
- finalBlockId.toSegment() > currentSegment)
- {
- fetchNextSegment(origInterest, data.getName(), currentSegment + 1, self);
- }
+ const name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
+ if (finalBlockId.empty() || (finalBlockId > currentSegment)) {
+ fetchNextSegment(origInterest, data->getName(), currentSegment.toSegment() + 1, self);
+ }
else {
return m_completeCallback(m_buffer->buf());
}
}
}
- catch (const tlv::Error& e) {
- m_errorCallback(DATA_HAS_NO_SEGMENT, std::string("Error while decoding segment: ") + e.what());
+ else {
+ m_errorCallback(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
}
}
-} // util
-} // ndn
+void
+SegmentFetcher::afterValidationFailure(const shared_ptr<const Data> data)
+{
+ return m_errorCallback(SEGMENT_VALIDATION_FAIL, "Segment validation fail");
+}
+
+
+void
+SegmentFetcher::afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+ uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
+{
+ if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
+ m_errorCallback(NACK_ERROR, "Nack Error");
+ }
+ else {
+ switch (nack.getReason()) {
+ case lp::NackReason::DUPLICATE:
+ reExpressInterest(origInterest, reExpressCount, self);
+ break;
+ case lp::NackReason::CONGESTION:
+ m_scheduler.scheduleEvent(time::milliseconds(static_cast<uint32_t>(pow(2, reExpressCount + 1))),
+ bind(&SegmentFetcher::reExpressInterest, this,
+ origInterest, reExpressCount, self));
+ break;
+ default:
+ m_errorCallback(NACK_ERROR, "Nack Error");
+ break;
+ }
+ }
+}
+
+void
+SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
+ shared_ptr<SegmentFetcher> self)
+{
+ interest.refreshNonce();
+ BOOST_ASSERT(interest.hasNonce());
+
+ m_face.expressInterest(interest,
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, ++reExpressCount, self),
+ bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+}
+
+} // namespace util
+} // namespace ndn
diff --git a/src/util/segment-fetcher.hpp b/src/util/segment-fetcher.hpp
index fb8f6d7..0c2e908 100644
--- a/src/util/segment-fetcher.hpp
+++ b/src/util/segment-fetcher.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -22,8 +22,10 @@
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
#define NDN_UTIL_SEGMENT_FETCHER_HPP
+#include "scheduler.hpp"
#include "../common.hpp"
#include "../face.hpp"
+#include "../security/validator.hpp"
namespace ndn {
@@ -32,19 +34,6 @@
namespace util {
/**
- * @brief Functor to skip validation of individual packets by SegmentFetcher
- */
-class DontVerifySegment
-{
-public:
- bool
- operator()(const Data& data) const
- {
- return true;
- }
-};
-
-/**
* @brief Utility class to fetch latest version of the segmented data
*
* SegmentFetcher assumes that the data is named `/<prefix>/<version>/<segment>`,
@@ -82,38 +71,42 @@
* - `INTEREST_TIMEOUT`: if any of the Interests times out
* - `DATA_HAS_NO_SEGMENT`: if any of the retrieved Data packets don't have segment
* as a last component of the name (not counting implicit digest)
- * - `SEGMENT_VERIFICATION_FAIL`: if any retrieved segment fails user-provided validation
+ * - `SEGMENT_VALIDATION_FAIL`: if any retrieved segment fails user-provided validation
*
- * In order to validate individual segments, an VerifySegment callback needs to be specified.
- * If the callback returns false, fetching process is aborted with SEGMENT_VERIFICATION_FAIL.
- * If data validation is not required, provided DontVerifySegment() functor can be used.
+ * In order to validate individual segments, a Validator instance needs to be specified.
+ * If the segment validation is successful, afterValidationSuccess callback is fired, otherwise
+ * afterValidationFailure callback.
*
* Examples:
*
* void
- * onComplete(const ConstBufferPtr& data)
+ * afterFetchComplete(const ConstBufferPtr& data)
* {
* ...
* }
*
* void
- * onError(uint32_t errorCode, const std::string& errorMsg)
+ * afterFetchError(uint32_t errorCode, const std::string& errorMsg)
* {
* ...
* }
*
* ...
* SegmentFetcher::fetch(face, Interest("/data/prefix", time::seconds(1000)),
- * DontVerifySegment(),
- * bind(&onComplete, this, _1),
- * bind(&onError, this, _1, _2));
+ * validator,
+ * bind(&afterFetchComplete, this, _1),
+ * bind(&afterFetchError, this, _1, _2));
*
*/
class SegmentFetcher : noncopyable
{
public:
+ /**
+ * @brief Maximum number of times an interest will be reexpressed incase of NackCallback
+ */
+ static const uint32_t MAX_INTEREST_REEXPRESS;
+
typedef function<void (const ConstBufferPtr& data)> CompleteCallback;
- typedef function<bool (const Data& data)> VerifySegment;
typedef function<void (uint32_t code, const std::string& msg)> ErrorCallback;
/**
@@ -122,7 +115,8 @@
enum ErrorCode {
INTEREST_TIMEOUT = 1,
DATA_HAS_NO_SEGMENT = 2,
- SEGMENT_VERIFICATION_FAIL = 3
+ SEGMENT_VALIDATION_FAIL = 3,
+ NACK_ERROR = 4
};
/**
@@ -132,12 +126,36 @@
* @param baseInterest An Interest for the initial segment of requested data.
* This interest may include custom InterestLifetime and selectors that
* will propagate to all subsequent Interests. The only exception is that
+ * the initial Interest will be forced to include "ChildSelector=rightmost" and
+ * "MustBeFresh=true" selectors, which will be turned off in subsequent
+ * Interests.
+ * @param validator Reference to the Validator that should be used to validate data. Caller
+ * must ensure validator is valid until either completeCallback or errorCallback
+ * is invoked.
+ *
+ * @param completeCallback Callback to be fired when all segments are fetched
+ * @param errorCallback Callback to be fired when an error occurs (@see Errors)
+ */
+ static
+ void
+ fetch(Face& face,
+ const Interest& baseInterest,
+ Validator& validator,
+ const CompleteCallback& completeCallback,
+ const ErrorCallback& errorCallback);
+
+ /**
+ * @brief Initiate segment fetching
+ *
+ * @param face Reference to the Face that should be used to fetch data
+ * @param baseInterest An Interest for the initial segment of requested data.
+ * This interest may include custom InterestLifetime and selectors that
+ * will propagate to all subsequent Interests. The only exception is that
* the initial Interest will be forced to include "ChildSelector=1" and
* "MustBeFresh=true" selectors, which will be turned off in subsequent
* Interests.
- * @param verifySegment Functor to be called when Data segment is received. If
- * functor return false, fetching will be aborted with
- * SEGMENT_VERIFICATION_FAIL error
+ * @param validator A shared_ptr to the Validator that should be used to validate data.
+ *
* @param completeCallback Callback to be fired when all segments are fetched
* @param errorCallback Callback to be fired when an error occurs (@see Errors)
*/
@@ -145,38 +163,55 @@
void
fetch(Face& face,
const Interest& baseInterest,
- const VerifySegment& verifySegment,
+ shared_ptr<Validator> validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback);
private:
SegmentFetcher(Face& face,
- const VerifySegment& verifySegment,
+ shared_ptr<Validator> validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback);
void
- fetchFirstSegment(const Interest& baseInterest, const shared_ptr<SegmentFetcher>& self);
+ fetchFirstSegment(const Interest& baseInterest, shared_ptr<SegmentFetcher> self);
void
fetchNextSegment(const Interest& origInterest, const Name& dataName, uint64_t segmentNo,
- const shared_ptr<SegmentFetcher>& self);
+ shared_ptr<SegmentFetcher> self);
void
- onSegmentReceived(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
- const shared_ptr<SegmentFetcher>& self);
+ afterSegmentReceived(const Interest& origInterest,
+ const Data& data, bool isSegmentZeroExpected,
+ shared_ptr<SegmentFetcher> self);
+ void
+ afterValidationSuccess(const shared_ptr<const Data> data,
+ bool isSegmentZeroExpected,
+ const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self);
+
+ void
+ afterValidationFailure(const shared_ptr<const Data> data);
+
+ void
+ afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+ uint32_t reExpressCount, shared_ptr<SegmentFetcher> self);
+
+ void
+ reExpressInterest(Interest interest, uint32_t reExpressCount,
+ shared_ptr<SegmentFetcher> self);
private:
Face& m_face;
- VerifySegment m_verifySegment;
+ Scheduler m_scheduler;
+ shared_ptr<Validator> m_validator;
CompleteCallback m_completeCallback;
ErrorCallback m_errorCallback;
shared_ptr<OBufferStream> m_buffer;
};
-} // util
-} // ndn
+} // namespace util
+} // namespace ndn
#endif // NDN_UTIL_SEGMENT_FETCHER_HPP