blob: 680ff5960d89c0c4d6ee08e71cd91eb484586dd9 [file] [log] [blame]
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2013-2019 Regents of the University of California,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
* ndn-cxx library is free software: you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later version.
*
* ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received copies of the GNU General Public License and GNU Lesser
* General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
* <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
*
* @author Shuo Yang
* @author Weiwei Liu
* @author Chavoosh Ghasemi
*/
#include "ndn-cxx/util/segment-fetcher.hpp"
#include "ndn-cxx/name-component.hpp"
#include "ndn-cxx/encoding/buffer-stream.hpp"
#include "ndn-cxx/lp/nack.hpp"
#include "ndn-cxx/lp/nack-header.hpp"
#include <boost/asio/io_service.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/range/adaptor/map.hpp>
#include <cmath>
namespace ndn {
namespace util {
constexpr double SegmentFetcher::MIN_SSTHRESH;
void
SegmentFetcher::Options::validate()
{
if (maxTimeout < 1_ms) {
NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
}
if (initCwnd < 1.0) {
NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
}
if (aiStep < 0.0) {
NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
}
if (mdCoef < 0.0 || mdCoef > 1.0) {
NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
}
}
SegmentFetcher::SegmentFetcher(Face& face,
security::v2::Validator& validator,
const SegmentFetcher::Options& options)
: m_options(options)
, m_face(face)
, m_scheduler(m_face.getIoService())
, m_validator(validator)
, m_rttEstimator(options.rttOptions)
, m_timeLastSegmentReceived(time::steady_clock::now())
, m_nextSegmentNum(0)
, m_cwnd(options.initCwnd)
, m_ssthresh(options.initSsthresh)
, m_nSegmentsInFlight(0)
, m_nSegments(0)
, m_highInterest(0)
, m_highData(0)
, m_recPoint(0)
, m_nReceived(0)
, m_nBytesReceived(0)
{
m_options.validate();
}
shared_ptr<SegmentFetcher>
SegmentFetcher::start(Face& face,
const Interest& baseInterest,
security::v2::Validator& validator,
const SegmentFetcher::Options& options)
{
shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
fetcher->m_this = fetcher;
fetcher->fetchFirstSegment(baseInterest, false);
return fetcher;
}
void
SegmentFetcher::stop()
{
if (!m_this) {
return;
}
m_pendingSegments.clear(); // cancels pending Interests and timeout events
m_face.getIoService().post([self = std::move(m_this)] {});
}
bool
SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
{
auto self = weakSelf.lock();
return self == nullptr || self->m_this == nullptr;
}
void
SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
{
Interest interest(baseInterest);
interest.setCanBePrefix(true);
interest.setMustBeFresh(true);
interest.setInterestLifetime(m_options.interestLifetime);
if (isRetransmission) {
interest.refreshNonce();
}
sendInterest(0, interest, isRetransmission);
}
void
SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
{
if (checkAllSegmentsReceived()) {
// All segments have been retrieved
return finalizeFetch();
}
int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
while (availableWindowSize > 0) {
if (!m_retxQueue.empty()) {
auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
m_retxQueue.pop();
if (pendingSegmentIt == m_pendingSegments.end()) {
// Skip re-requesting this segment, since it was received after RTO timeout
continue;
}
BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
}
else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
// Don't request a segment a second time if received in response to first "discovery" Interest
m_nextSegmentNum++;
continue;
}
segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
}
else {
break;
}
availableWindowSize--;
}
for (const auto& segment : segmentsToRequest) {
Interest interest(origInterest); // to preserve Interest elements
interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
interest.setCanBePrefix(false);
interest.setMustBeFresh(false);
interest.setInterestLifetime(m_options.interestLifetime);
interest.refreshNonce();
sendInterest(segment.first, interest, segment.second);
}
}
void
SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
{
weak_ptr<SegmentFetcher> weakSelf = m_this;
++m_nSegmentsInFlight;
auto pendingInterest = m_face.expressInterest(interest,
[this, weakSelf] (const Interest& interest, const Data& data) {
afterSegmentReceivedCb(interest, data, weakSelf);
},
[this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
afterNackReceivedCb(interest, nack, weakSelf);
},
nullptr);
auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
afterTimeoutCb(interest, weakSelf);
});
if (isRetransmission) {
updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
return;
}
PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
pendingInterest, timeoutEvent};
bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
BOOST_VERIFY(isNew);
m_highInterest = segNum;
}
void
SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
const weak_ptr<SegmentFetcher>& weakSelf)
{
if (shouldStop(weakSelf))
return;
BOOST_ASSERT(m_nSegmentsInFlight > 0);
m_nSegmentsInFlight--;
name::Component currentSegmentComponent = data.getName().get(-1);
if (!currentSegmentComponent.isSegment()) {
return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
}
uint64_t currentSegment = currentSegmentComponent.toSegment();
// The first received Interest could have any segment ID
std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
if (m_receivedSegments.size() > 0) {
pendingSegmentIt = m_pendingSegments.find(currentSegment);
}
else {
pendingSegmentIt = m_pendingSegments.begin();
}
if (pendingSegmentIt == m_pendingSegments.end()) {
return;
}
pendingSegmentIt->second.timeoutEvent.cancel();
afterSegmentReceived(data);
m_validator.validate(data,
bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
pendingSegmentIt, weakSelf),
bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
}
void
SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
const weak_ptr<SegmentFetcher>& weakSelf)
{
if (shouldStop(weakSelf))
return;
// We update the last receive time here instead of in the segment received callback so that the
// transfer will not fail to terminate if we only received invalid Data packets.
m_timeLastSegmentReceived = time::steady_clock::now();
m_nReceived++;
// It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
uint64_t currentSegment = data.getName().get(-1).toSegment();
// Add measurement to RTO estimator (if not retransmission)
if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
}
// Remove from pending segments map
m_pendingSegments.erase(pendingSegmentIt);
// Copy data in segment to temporary buffer
auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
std::forward_as_tuple(currentSegment),
std::forward_as_tuple(data.getContent().value_size()));
std::copy(data.getContent().value_begin(), data.getContent().value_end(),
receivedSegmentIt.first->second.begin());
m_nBytesReceived += data.getContent().value_size();
afterSegmentValidated(data);
if (data.getFinalBlock()) {
if (!data.getFinalBlock()->isSegment()) {
return signalError(FINALBLOCKID_NOT_SEGMENT,
"Received FinalBlockId did not contain a segment component");
}
if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
m_nSegments = data.getFinalBlock()->toSegment() + 1;
cancelExcessInFlightSegments();
}
}
if (m_receivedSegments.size() == 1) {
m_versionedDataName = data.getName().getPrefix(-1);
if (currentSegment == 0) {
// We received the first segment in response, so we can increment the next segment number
m_nextSegmentNum++;
}
}
if (m_highData < currentSegment) {
m_highData = currentSegment;
}
if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
windowDecrease();
}
else {
windowIncrease();
}
fetchSegmentsInWindow(origInterest);
}
void
SegmentFetcher::afterValidationFailure(const Data& data,
const security::v2::ValidationError& error,
const weak_ptr<SegmentFetcher>& weakSelf)
{
if (shouldStop(weakSelf))
return;
signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
}
void
SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
const weak_ptr<SegmentFetcher>& weakSelf)
{
if (shouldStop(weakSelf))
return;
afterSegmentNacked();
BOOST_ASSERT(m_nSegmentsInFlight > 0);
m_nSegmentsInFlight--;
switch (nack.getReason()) {
case lp::NackReason::DUPLICATE:
case lp::NackReason::CONGESTION:
afterNackOrTimeout(origInterest);
break;
default:
signalError(NACK_ERROR, "Nack Error");
break;
}
}
void
SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
const weak_ptr<SegmentFetcher>& weakSelf)
{
if (shouldStop(weakSelf))
return;
afterSegmentTimedOut();
BOOST_ASSERT(m_nSegmentsInFlight > 0);
m_nSegmentsInFlight--;
afterNackOrTimeout(origInterest);
}
void
SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
{
if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
// Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
}
name::Component lastNameComponent = origInterest.getName().get(-1);
std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
BOOST_ASSERT(m_pendingSegments.size() > 0);
if (lastNameComponent.isSegment()) {
BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
}
else { // First Interest
BOOST_ASSERT(m_pendingSegments.size() > 0);
pendingSegmentIt = m_pendingSegments.begin();
}
// Cancel timeout event and set status to InRetxQueue
pendingSegmentIt->second.timeoutEvent.cancel();
pendingSegmentIt->second.state = SegmentState::InRetxQueue;
m_rttEstimator.backoffRto();
if (m_receivedSegments.size() == 0) {
// Resend first Interest (until maximum receive timeout exceeded)
fetchFirstSegment(origInterest, true);
}
else {
windowDecrease();
m_retxQueue.push(pendingSegmentIt->first);
fetchSegmentsInWindow(origInterest);
}
}
void
SegmentFetcher::finalizeFetch()
{
// Combine segments into final buffer
OBufferStream buf;
// We may have received more segments than exist in the object.
BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
for (int64_t i = 0; i < m_nSegments; i++) {
buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
}
onComplete(buf.buf());
stop();
}
void
SegmentFetcher::windowIncrease()
{
if (m_options.useConstantCwnd) {
BOOST_ASSERT(m_cwnd == m_options.initCwnd);
return;
}
if (m_cwnd < m_ssthresh) {
m_cwnd += m_options.aiStep; // additive increase
}
else {
m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
}
}
void
SegmentFetcher::windowDecrease()
{
if (m_options.disableCwa || m_highData > m_recPoint) {
m_recPoint = m_highInterest;
if (m_options.useConstantCwnd) {
BOOST_ASSERT(m_cwnd == m_options.initCwnd);
return;
}
// Refer to RFC 5681, Section 3.1 for the rationale behind the code below
m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
}
}
void
SegmentFetcher::signalError(uint32_t code, const std::string& msg)
{
onError(code, msg);
stop();
}
void
SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
const PendingInterestHandle& pendingInterest,
scheduler::EventId timeoutEvent)
{
auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
pendingSegmentIt->second.state = SegmentState::Retransmitted;
pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
pendingSegmentIt->second.timeoutEvent = timeoutEvent;
}
void
SegmentFetcher::cancelExcessInFlightSegments()
{
for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
if (it->first >= static_cast<uint64_t>(m_nSegments)) {
it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
BOOST_ASSERT(m_nSegmentsInFlight > 0);
m_nSegmentsInFlight--;
}
else {
++it;
}
}
}
bool
SegmentFetcher::checkAllSegmentsReceived()
{
bool haveReceivedAllSegments = false;
if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
haveReceivedAllSegments = true;
// Verify that all segments in window have been received. If not, send Interests for missing segments.
for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
if (m_receivedSegments.count(i) == 0) {
m_retxQueue.push(i);
haveReceivedAllSegments = false;
}
}
}
return haveReceivedAllSegments;
}
time::milliseconds
SegmentFetcher::getEstimatedRto()
{
// We don't want an Interest timeout greater than the maximum allowed timeout between the
// succesful receipt of segments
return std::min(m_options.maxTimeout,
time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
}
} // namespace util
} // namespace ndn