blob: 4e556d40900d97a92ac6c615b4f070925f067ec1 [file] [log] [blame]
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2013-2019 Regents of the University of California,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
* ndn-cxx library is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later version.
*
* ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received copies of the GNU General Public License and GNU Lesser
* General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
*
* @author Shuo Yang
* @author Weiwei Liu
* @author Chavoosh Ghasemi
*/
#ifndef NDN_UTIL_SEGMENT_FETCHER_HPP
#define NDN_UTIL_SEGMENT_FETCHER_HPP
#include "ndn-cxx/face.hpp"
#include "ndn-cxx/security/v2/validator.hpp"
#include "ndn-cxx/util/rtt-estimator.hpp"
#include "ndn-cxx/util/scheduler.hpp"
#include "ndn-cxx/util/signal.hpp"
#include <queue>
namespace ndn {
namespace util {
/**
* @brief Utility class to fetch the latest version of a segmented object.
*
* SegmentFetcher assumes that segments in the object are named `/<prefix>/<version>/<segment>`,
* where:
* - `<prefix>` is the specified prefix,
* - `<version>` is an unknown version that needs to be discovered, and
* - `<segment>` is a segment number (the number of segments in the object is unknown until a Data
* packet containing the `FinalBlockId` field is received).
*
* SegmentFetcher implements the following logic:
*
* 1. Express an Interest to discover the latest version of the object:
*
* Interest: `/<prefix>?ndn.CanBePrefix=true&ndn.MustBeFresh=true`
*
* 2. Infer the latest version of the object: `<version> = Data.getName().get(-2)`
*
* 3. Keep sending Interests for future segments until an error occurs or the number of segments
* indicated by the FinalBlockId in a received Data packet is reached. This retrieval will start
* at segment 1 if segment 0 was received in response to the Interest expressed in step 2;
* otherwise, retrieval will start at segment 0. By default, congestion control will be used to
* manage the Interest window size. Interests expressed in this step will follow this Name
* format:
*
* Interest: `/<prefix>/<version>/<segment=(N)>`
*
* 4. Signal #onComplete passing a memory buffer that combines the content of all segments in the object.
*
* If an error occurs during the fetching process, #onError is signaled with one of the error codes
* from SegmentFetcher::ErrorCode.
*
* A Validator instance must be specified to validate individual segments. Every time a segment has
* been successfully validated, #afterSegmentValidated will be signaled.
*
* Example:
* @code
* void
* afterFetchComplete(ConstBufferPtr data)
* {
* ...
* }
*
* void
* afterFetchError(uint32_t errorCode, const std::string& errorMsg)
* {
* ...
* }
*
* ...
* auto fetcher = SegmentFetcher::start(face, Interest("/data/prefix"), validator);
* fetcher->onComplete.connect(bind(&afterFetchComplete, this, _1));
* fetcher->onError.connect(bind(&afterFetchError, this, _1, _2));
* @endcode
*/
class SegmentFetcher : noncopyable
{
public:
/**
* @brief Error codes passed to #onError.
*/
enum ErrorCode {
/// Retrieval timed out because the maximum timeout between the successful receipt of segments was exceeded
INTEREST_TIMEOUT = 1,
/// One of the retrieved Data packets lacked a segment number in the last Name component (excl. implicit digest)
DATA_HAS_NO_SEGMENT = 2,
/// One of the retrieved segments failed user-provided validation
SEGMENT_VALIDATION_FAIL = 3,
/// An unrecoverable Nack was received during retrieval
NACK_ERROR = 4,
/// A received FinalBlockId did not contain a segment component
FINALBLOCKID_NOT_SEGMENT = 5,
};
class Options
{
public:
Options()
{
}
void
validate();
public:
bool useConstantCwnd = false; ///< if true, window size is kept at `initCwnd`
bool useConstantInterestTimeout = false; ///< if true, Interest timeout is kept at `maxTimeout`
time::milliseconds maxTimeout = 60_s; ///< maximum allowed time between successful receipt of segments
time::milliseconds interestLifetime = 4_s; ///< lifetime of sent Interests - independent of Interest timeout
double initCwnd = 1.0; ///< initial congestion window size
double initSsthresh = std::numeric_limits<double>::max(); ///< initial slow start threshold
double aiStep = 1.0; ///< additive increase step (in segments)
double mdCoef = 0.5; ///< multiplicative decrease coefficient
bool disableCwa = false; ///< disable Conservative Window Adaptation
bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
bool ignoreCongMarks = false; ///< disable window decrease after congestion mark received
RttEstimator::Options rttOptions; ///< options for RTT estimator
};
/**
* @brief Initiates segment fetching.
*
* Transfer completion, failure, and progress are indicated via signals.
*
* @param face Reference to the Face that should be used to fetch data.
* @param baseInterest Interest for the initial segment of requested data.
* This interest may include a custom InterestLifetime and parameters that
* will propagate to all subsequent Interests. The only exception is that the
* initial Interest will be forced to include the "CanBePrefix=true" and
* "MustBeFresh=true" parameters, which will not be included in subsequent
* Interests.
* @param validator Reference to the Validator the fetcher will use to validate data.
* The caller must ensure the validator remains valid until either #onComplete
* or #onError has been signaled.
* @param options Options controlling the transfer.
*
* @return A shared_ptr to the constructed SegmentFetcher.
* This shared_ptr is kept internally for the lifetime of the transfer.
* Therefore, it does not need to be saved and is provided here so that the
* SegmentFetcher's signals can be connected to.
*/
static shared_ptr<SegmentFetcher>
start(Face& face,
const Interest& baseInterest,
security::v2::Validator& validator,
const Options& options = Options());
/**
* @brief Stops fetching.
*
* This cancels all interests that are still pending.
*/
void
stop();
private:
class PendingSegment;
SegmentFetcher(Face& face, security::v2::Validator& validator, const Options& options);
static bool
shouldStop(const weak_ptr<SegmentFetcher>& weakSelf);
void
fetchFirstSegment(const Interest& baseInterest, bool isRetransmission);
void
fetchSegmentsInWindow(const Interest& origInterest);
void
sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission);
void
afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
const weak_ptr<SegmentFetcher>& weakSelf);
void
afterValidationSuccess(const Data& data, const Interest& origInterest,
std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
const weak_ptr<SegmentFetcher>& weakSelf);
void
afterValidationFailure(const Data& data,
const security::v2::ValidationError& error,
const weak_ptr<SegmentFetcher>& weakSelf);
void
afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
const weak_ptr<SegmentFetcher>& weakSelf);
void
afterTimeoutCb(const Interest& origInterest,
const weak_ptr<SegmentFetcher>& weakSelf);
void
afterNackOrTimeout(const Interest& origInterest);
void
finalizeFetch();
void
windowIncrease();
void
windowDecrease();
void
signalError(uint32_t code, const std::string& msg);
void
updateRetransmittedSegment(uint64_t segmentNum,
const PendingInterestHandle& pendingInterest,
scheduler::EventId timeoutEvent);
void
cancelExcessInFlightSegments();
bool
checkAllSegmentsReceived();
time::milliseconds
getEstimatedRto();
public:
/**
* @brief Emits upon successful retrieval of the complete data.
*/
Signal<SegmentFetcher, ConstBufferPtr> onComplete;
/**
* @brief Emits when the retrieval could not be completed due to an error.
*
* Handlers are provided with an error code and a string error message.
*/
Signal<SegmentFetcher, uint32_t, std::string> onError;
/**
* @brief Emits whenever a data segment received.
*/
Signal<SegmentFetcher, Data> afterSegmentReceived;
/**
* @brief Emits whenever a received data segment has been successfully validated.
*/
Signal<SegmentFetcher, Data> afterSegmentValidated;
/**
* @brief Emits whenever an Interest for a data segment is nacked.
*/
Signal<SegmentFetcher> afterSegmentNacked;
/**
* @brief Emits whenever an Interest for a data segment times out.
*/
Signal<SegmentFetcher> afterSegmentTimedOut;
private:
enum class SegmentState {
FirstInterest, ///< the first Interest for this segment has been sent
InRetxQueue, ///< the segment is awaiting Interest retransmission
Retransmitted, ///< one or more retransmitted Interests have been sent for this segment
};
class PendingSegment
{
public:
SegmentState state;
time::steady_clock::TimePoint sendTime;
ScopedPendingInterestHandle hdl;
scheduler::ScopedEventId timeoutEvent;
};
NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
static constexpr double MIN_SSTHRESH = 2.0;
shared_ptr<SegmentFetcher> m_this;
Options m_options;
Face& m_face;
Scheduler m_scheduler;
security::v2::Validator& m_validator;
RttEstimator m_rttEstimator;
time::milliseconds m_timeout;
time::steady_clock::TimePoint m_timeLastSegmentReceived;
std::queue<uint64_t> m_retxQueue;
Name m_versionedDataName;
uint64_t m_nextSegmentNum;
double m_cwnd;
double m_ssthresh;
int64_t m_nSegmentsInFlight;
int64_t m_nSegments;
uint64_t m_highInterest;
uint64_t m_highData;
uint64_t m_recPoint;
int64_t m_nReceived;
int64_t m_nBytesReceived;
std::map<uint64_t, Buffer> m_receivedSegments;
std::map<uint64_t, PendingSegment> m_pendingSegments;
};
} // namespace util
} // namespace ndn
#endif // NDN_UTIL_SEGMENT_FETCHER_HPP