util: Asynchronous data validation in SegmentFetcher
Change-Id: I407d3c1de49c86a6b4a0174baae6ed2e8be5c554
Refs: #2734
diff --git a/src/util/segment-fetcher.cpp b/src/util/segment-fetcher.cpp
index e7703c5..a9c0418 100644
--- a/src/util/segment-fetcher.cpp
+++ b/src/util/segment-fetcher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2014 Regents of the University of California.
+ * Copyright (c) 2013-2015 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -20,18 +20,23 @@
*/
#include "segment-fetcher.hpp"
-
#include "../encoding/buffer-stream.hpp"
+#include "../name-component.hpp"
+#include "../lp/nack.hpp"
+#include "../lp/nack-header.hpp"
namespace ndn {
namespace util {
+const uint32_t SegmentFetcher::MAX_INTEREST_REEXPRESS = 3;
+
SegmentFetcher::SegmentFetcher(Face& face,
- const VerifySegment& verifySegment,
+ shared_ptr<Validator> validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback)
: m_face(face)
- , m_verifySegment(verifySegment)
+ , m_scheduler(m_face.getIoService())
+ , m_validator(validator)
, m_completeCallback(completeCallback)
, m_errorCallback(errorCallback)
, m_buffer(make_shared<OBufferStream>())
@@ -41,79 +46,143 @@
void
SegmentFetcher::fetch(Face& face,
const Interest& baseInterest,
- const VerifySegment& verifySegment,
+ Validator& validator,
const CompleteCallback& completeCallback,
const ErrorCallback& errorCallback)
{
- shared_ptr<SegmentFetcher> fetcher =
- shared_ptr<SegmentFetcher>(new SegmentFetcher(face, verifySegment,
- completeCallback, errorCallback));
+ shared_ptr<Validator> sharedValidator = shared_ptr<Validator>(&validator, [] (Validator*) {});
+
+ fetch(face, baseInterest, sharedValidator, completeCallback, errorCallback);
+}
+
+void
+SegmentFetcher::fetch(Face& face,
+ const Interest& baseInterest,
+ shared_ptr<Validator> validator,
+ const CompleteCallback& completeCallback,
+ const ErrorCallback& errorCallback)
+{
+ shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, completeCallback,
+ errorCallback));
fetcher->fetchFirstSegment(baseInterest, fetcher);
}
void
SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
- const shared_ptr<SegmentFetcher>& self)
+ shared_ptr<SegmentFetcher> self)
{
Interest interest(baseInterest);
interest.setChildSelector(1);
interest.setMustBeFresh(true);
m_face.expressInterest(interest,
- bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
}
void
SegmentFetcher::fetchNextSegment(const Interest& origInterest, const Name& dataName,
uint64_t segmentNo,
- const shared_ptr<SegmentFetcher>& self)
+ shared_ptr<SegmentFetcher> self)
{
- Interest interest(origInterest); // to preserve any special selectors
+ Interest interest(origInterest); // to preserve any selectors
interest.refreshNonce();
interest.setChildSelector(0);
interest.setMustBeFresh(false);
interest.setName(dataName.getPrefix(-1).appendSegment(segmentNo));
m_face.expressInterest(interest,
- bind(&SegmentFetcher::onSegmentReceived, this, _1, _2, false, self),
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, false, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, 0, self),
bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
}
void
-SegmentFetcher::onSegmentReceived(const Interest& origInterest,
- const Data& data, bool isSegmentZeroExpected,
- const shared_ptr<SegmentFetcher>& self)
+SegmentFetcher::afterSegmentReceived(const Interest& origInterest,
+ const Data& data, bool isSegmentZeroExpected,
+ shared_ptr<SegmentFetcher> self)
{
- if (!m_verifySegment(data)) {
- return m_errorCallback(SEGMENT_VERIFICATION_FAIL, "Segment validation fail");
- }
+ m_validator->validate(data,
+ bind(&SegmentFetcher::afterValidationSuccess, this, _1,
+ isSegmentZeroExpected, origInterest, self),
+ bind(&SegmentFetcher::afterValidationFailure, this, _1));
- try {
- uint64_t currentSegment = data.getName().get(-1).toSegment();
+}
- if (isSegmentZeroExpected && currentSegment != 0) {
- fetchNextSegment(origInterest, data.getName(), 0, self);
+void
+SegmentFetcher::afterValidationSuccess(const shared_ptr<const Data> data,
+ bool isSegmentZeroExpected,
+ const Interest& origInterest,
+ shared_ptr<SegmentFetcher> self)
+{
+ name::Component currentSegment = data->getName().get(-1);
+
+ if (currentSegment.isSegment()) {
+ if (isSegmentZeroExpected && currentSegment.toSegment() != 0) {
+ fetchNextSegment(origInterest, data->getName(), 0, self);
}
else {
- m_buffer->write(reinterpret_cast<const char*>(data.getContent().value()),
- data.getContent().value_size());
+ m_buffer->write(reinterpret_cast<const char*>(data->getContent().value()),
+ data->getContent().value_size());
- const name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
- if (finalBlockId.empty() ||
- finalBlockId.toSegment() > currentSegment)
- {
- fetchNextSegment(origInterest, data.getName(), currentSegment + 1, self);
- }
+ const name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
+ if (finalBlockId.empty() || (finalBlockId > currentSegment)) {
+ fetchNextSegment(origInterest, data->getName(), currentSegment.toSegment() + 1, self);
+ }
else {
return m_completeCallback(m_buffer->buf());
}
}
}
- catch (const tlv::Error& e) {
- m_errorCallback(DATA_HAS_NO_SEGMENT, std::string("Error while decoding segment: ") + e.what());
+ else {
+ m_errorCallback(DATA_HAS_NO_SEGMENT, "Data Name has no segment number.");
}
}
-} // util
-} // ndn
+void
+SegmentFetcher::afterValidationFailure(const shared_ptr<const Data> data)
+{
+ return m_errorCallback(SEGMENT_VALIDATION_FAIL, "Segment validation fail");
+}
+
+
+void
+SegmentFetcher::afterNackReceived(const Interest& origInterest, const lp::Nack& nack,
+ uint32_t reExpressCount, shared_ptr<SegmentFetcher> self)
+{
+ if (reExpressCount >= MAX_INTEREST_REEXPRESS) {
+ m_errorCallback(NACK_ERROR, "Nack Error");
+ }
+ else {
+ switch (nack.getReason()) {
+ case lp::NackReason::DUPLICATE:
+ reExpressInterest(origInterest, reExpressCount, self);
+ break;
+ case lp::NackReason::CONGESTION:
+ m_scheduler.scheduleEvent(time::milliseconds(static_cast<uint32_t>(pow(2, reExpressCount + 1))),
+ bind(&SegmentFetcher::reExpressInterest, this,
+ origInterest, reExpressCount, self));
+ break;
+ default:
+ m_errorCallback(NACK_ERROR, "Nack Error");
+ break;
+ }
+ }
+}
+
+void
+SegmentFetcher::reExpressInterest(Interest interest, uint32_t reExpressCount,
+ shared_ptr<SegmentFetcher> self)
+{
+ interest.refreshNonce();
+ BOOST_ASSERT(interest.hasNonce());
+
+ m_face.expressInterest(interest,
+ bind(&SegmentFetcher::afterSegmentReceived, this, _1, _2, true, self),
+ bind(&SegmentFetcher::afterNackReceived, this, _1, _2, ++reExpressCount, self),
+ bind(m_errorCallback, INTEREST_TIMEOUT, "Timeout"));
+}
+
+} // namespace util
+} // namespace ndn