fw: various code simplifications in AsfStrategy
Change-Id: Ie006680b2469fa7dc5d9b19665320b2686564f2c
diff --git a/daemon/fw/asf-measurements.cpp b/daemon/fw/asf-measurements.cpp
index cf3a317..0e7ab48 100644
--- a/daemon/fw/asf-measurements.cpp
+++ b/daemon/fw/asf-measurements.cpp
@@ -30,155 +30,54 @@
namespace fw {
namespace asf {
-NFD_LOG_INIT(AsfMeasurements);
+const time::nanoseconds FaceInfo::RTT_NO_MEASUREMENT{-1};
+const time::nanoseconds FaceInfo::RTT_TIMEOUT{-2};
-const time::nanoseconds RttStats::RTT_TIMEOUT(-1);
-const time::nanoseconds RttStats::RTT_NO_MEASUREMENT(0);
-const double RttStats::ALPHA = 0.125;
-
-RttStats::RttStats()
- : m_srtt(RTT_NO_MEASUREMENT)
- , m_rtt(RTT_NO_MEASUREMENT)
+time::nanoseconds
+FaceInfo::scheduleTimeout(const Name& interestName, scheduler::EventCallback cb)
{
+ BOOST_ASSERT(!m_timeoutEvent);
+ m_lastInterestName = interestName;
+ m_timeoutEvent = getScheduler().schedule(m_rttEstimator.getEstimatedRto(), std::move(cb));
+ return m_rttEstimator.getEstimatedRto();
}
void
-RttStats::addRttMeasurement(time::nanoseconds durationRtt)
+FaceInfo::cancelTimeout(const Name& prefix)
{
- m_rtt = durationRtt;
- m_rttEstimator.addMeasurement(durationRtt, 1);
- m_srtt = m_rttEstimator.getSmoothedRtt();
+ if (m_lastInterestName.isPrefixOf(prefix)) {
+ m_timeoutEvent.cancel();
+ }
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
-FaceInfo::FaceInfo()
- : m_isTimeoutScheduled(false)
- , m_nSilentTimeouts(0)
-{
-}
-
-FaceInfo::~FaceInfo()
-{
- cancelTimeoutEvent();
- m_measurementExpirationId.cancel();
-}
-
-void
-FaceInfo::setTimeoutEvent(const scheduler::EventId& id, const Name& interestName)
-{
- if (!m_isTimeoutScheduled) {
- m_timeoutEventId = id;
- m_isTimeoutScheduled = true;
- m_lastInterestName = interestName;
- }
- else {
- NDN_THROW(FaceInfo::Error("Tried to schedule a timeout for a face that already has a timeout scheduled"));
- }
-}
-
-void
-FaceInfo::cancelTimeoutEvent()
-{
- m_timeoutEventId.cancel();
- m_isTimeoutScheduled = false;
-}
-
-void
-FaceInfo::cancelTimeoutEvent(const Name& prefix)
-{
- if (isTimeoutScheduled() && doesNameMatchLastInterest(prefix)) {
- cancelTimeoutEvent();
- }
-}
-
-bool
-FaceInfo::doesNameMatchLastInterest(const Name& name)
-{
- return m_lastInterestName.isPrefixOf(name);
-}
-
-void
-FaceInfo::recordRtt(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace)
-{
- // Calculate RTT
- auto outRecord = pitEntry->getOutRecord(inFace, 0);
-
- if (outRecord == pitEntry->out_end()) { // no out-record
- NFD_LOG_TRACE(pitEntry->getInterest() << " dataFrom inFace=" << inFace.getId() << " no-out-record");
- return;
- }
-
- m_rttStats.addRttMeasurement(time::steady_clock::now() - outRecord->getLastRenewed());
- NFD_LOG_TRACE("Recording RTT for FaceId: " << inFace.getId()
- << " RTT: " << m_rttStats.getRtt() << " SRTT: " << m_rttStats.getSrtt());
-}
-
-void
-FaceInfo::recordTimeout(const Name& interestName)
-{
- m_rttStats.recordTimeout();
- cancelTimeoutEvent(interestName);
-}
-
-////////////////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////////////
-
-NamespaceInfo::NamespaceInfo()
- : m_isProbingDue(false)
- , m_hasFirstProbeBeenScheduled(false)
-{
-}
-
FaceInfo*
-NamespaceInfo::getFaceInfo(const fib::Entry&, FaceId faceId)
+NamespaceInfo::getFaceInfo(FaceId faceId)
{
- auto it = m_fit.find(faceId);
- if (it != m_fit.end()) {
- return &it->second;
- }
- else {
- return nullptr;
- }
+ auto it = m_fiMap.find(faceId);
+ return it != m_fiMap.end() ? &it->second : nullptr;
}
FaceInfo&
-NamespaceInfo::getOrCreateFaceInfo(const fib::Entry&, FaceId faceId)
+NamespaceInfo::getOrCreateFaceInfo(FaceId faceId)
{
- auto it = m_fit.find(faceId);
- FaceInfo* info = nullptr;
-
- if (it == m_fit.end()) {
- const auto& pair = m_fit.emplace(std::piecewise_construct,
- std::forward_as_tuple(faceId),
- std::forward_as_tuple());
- info = &pair.first->second;
- extendFaceInfoLifetime(*info, faceId);
+ auto ret = m_fiMap.emplace(std::piecewise_construct,
+ std::forward_as_tuple(faceId),
+ std::forward_as_tuple(m_rttEstimatorOpts));
+ auto& faceInfo = ret.first->second;
+ if (ret.second) {
+ extendFaceInfoLifetime(faceInfo, faceId);
}
- else {
- info = &it->second;
- }
-
- return *info;
-}
-
-void
-NamespaceInfo::expireFaceInfo(FaceId faceId)
-{
- m_fit.erase(faceId);
+ return faceInfo;
}
void
NamespaceInfo::extendFaceInfoLifetime(FaceInfo& info, FaceId faceId)
{
- // Cancel previous expiration
- info.getMeasurementExpirationEventId().cancel();
-
- // Refresh measurement
- auto id = getScheduler().schedule(AsfMeasurements::MEASUREMENTS_LIFETIME,
- [=] { expireFaceInfo(faceId); });
- info.setMeasurementExpirationEventId(id);
+ info.m_measurementExpiration = getScheduler().schedule(AsfMeasurements::MEASUREMENTS_LIFETIME,
+ [=] { m_fiMap.erase(faceId); });
}
////////////////////////////////////////////////////////////////////////////////
@@ -188,22 +87,21 @@
AsfMeasurements::AsfMeasurements(MeasurementsAccessor& measurements)
: m_measurements(measurements)
+ , m_rttEstimatorOpts(make_shared<ndn::util::RttEstimator::Options>())
{
}
FaceInfo*
AsfMeasurements::getFaceInfo(const fib::Entry& fibEntry, const Interest& interest, FaceId faceId)
{
- NamespaceInfo& info = getOrCreateNamespaceInfo(fibEntry, interest);
- return info.getFaceInfo(fibEntry, faceId);
+ return getOrCreateNamespaceInfo(fibEntry, interest).getFaceInfo(faceId);
}
FaceInfo&
AsfMeasurements::getOrCreateFaceInfo(const fib::Entry& fibEntry, const Interest& interest,
FaceId faceId)
{
- NamespaceInfo& info = getOrCreateNamespaceInfo(fibEntry, interest);
- return info.getOrCreateFaceInfo(fibEntry, faceId);
+ return getOrCreateNamespaceInfo(fibEntry, interest).getOrCreateFaceInfo(faceId);
}
NamespaceInfo*
@@ -217,7 +115,7 @@
// Set or update entry lifetime
extendLifetime(*me);
- NamespaceInfo* info = me->insertStrategyInfo<NamespaceInfo>().first;
+ NamespaceInfo* info = me->insertStrategyInfo<NamespaceInfo>(m_rttEstimatorOpts).first;
BOOST_ASSERT(info != nullptr);
return info;
}
@@ -240,7 +138,7 @@
// Set or update entry lifetime
extendLifetime(*me);
- NamespaceInfo* info = me->insertStrategyInfo<NamespaceInfo>().first;
+ NamespaceInfo* info = me->insertStrategyInfo<NamespaceInfo>(m_rttEstimatorOpts).first;
BOOST_ASSERT(info != nullptr);
return *info;
}
diff --git a/daemon/fw/asf-measurements.hpp b/daemon/fw/asf-measurements.hpp
index 0032f14..14039f6 100644
--- a/daemon/fw/asf-measurements.hpp
+++ b/daemon/fw/asf-measurements.hpp
@@ -35,130 +35,59 @@
namespace fw {
namespace asf {
-class RttStats
-{
-public:
- RttStats();
-
- void
- addRttMeasurement(time::nanoseconds rtt);
-
- void
- recordTimeout()
- {
- m_rtt = RTT_TIMEOUT;
- }
-
- time::nanoseconds
- getRtt() const
- {
- return m_rtt;
- }
-
- time::nanoseconds
- getSrtt() const
- {
- return m_srtt;
- }
-
- time::nanoseconds
- computeRto() const
- {
- return m_rttEstimator.getEstimatedRto();
- }
-
-public:
- static const time::nanoseconds RTT_TIMEOUT;
- static const time::nanoseconds RTT_NO_MEASUREMENT;
-
-private:
- time::nanoseconds m_srtt;
- time::nanoseconds m_rtt;
- ndn::util::RttEstimator m_rttEstimator;
-
- static const double ALPHA;
-};
-
-////////////////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////////////
-
/** \brief Strategy information for each face in a namespace
*/
class FaceInfo
{
public:
- class Error : public std::runtime_error
+ explicit
+ FaceInfo(shared_ptr<const ndn::util::RttEstimator::Options> opts)
+ : m_rttEstimator(std::move(opts))
{
- public:
- explicit
- Error(const std::string& what)
- : std::runtime_error(what)
- {
- }
- };
-
- FaceInfo();
-
- ~FaceInfo();
-
- void
- setTimeoutEvent(const scheduler::EventId& id, const Name& interestName);
-
- void
- setMeasurementExpirationEventId(const scheduler::EventId& id)
- {
- m_measurementExpirationId = id;
}
- const scheduler::EventId&
- getMeasurementExpirationEventId()
- {
- return m_measurementExpirationId;
- }
-
- void
- cancelTimeoutEvent(const Name& prefix);
-
bool
isTimeoutScheduled() const
{
- return m_isTimeoutScheduled;
+ return !!m_timeoutEvent;
+ }
+
+ time::nanoseconds
+ scheduleTimeout(const Name& interestName, scheduler::EventCallback cb);
+
+ void
+ cancelTimeout(const Name& prefix);
+
+ void
+ recordRtt(time::nanoseconds rtt)
+ {
+ m_lastRtt = rtt;
+ m_rttEstimator.addMeasurement(rtt);
}
void
- recordRtt(const shared_ptr<pit::Entry>& pitEntry, const Face& inFace);
-
- void
- recordTimeout(const Name& interestName);
+ recordTimeout(const Name& interestName)
+ {
+ m_lastRtt = RTT_TIMEOUT;
+ cancelTimeout(interestName);
+ }
bool
- isTimeout() const
+ hasTimeout() const
{
- return getRtt() == RttStats::RTT_TIMEOUT;
+ return getLastRtt() == RTT_TIMEOUT;
}
time::nanoseconds
- computeRto() const
+ getLastRtt() const
{
- return m_rttStats.computeRto();
- }
-
- time::nanoseconds
- getRtt() const
- {
- return m_rttStats.getRtt();
+ return m_lastRtt;
}
time::nanoseconds
getSrtt() const
{
- return m_rttStats.getSrtt();
- }
-
- bool
- hasSrttMeasurement() const
- {
- return getSrtt() != RttStats::RTT_NO_MEASUREMENT;
+ return m_rttEstimator.getSmoothedRtt();
}
size_t
@@ -173,87 +102,53 @@
m_nSilentTimeouts = nSilentTimeouts;
}
-private:
- void
- cancelTimeoutEvent();
-
- bool
- doesNameMatchLastInterest(const Name& name);
+public:
+ static const time::nanoseconds RTT_NO_MEASUREMENT;
+ static const time::nanoseconds RTT_TIMEOUT;
private:
- RttStats m_rttStats;
+ ndn::util::RttEstimator m_rttEstimator;
+ time::nanoseconds m_lastRtt = RTT_NO_MEASUREMENT;
Name m_lastInterestName;
+ size_t m_nSilentTimeouts = 0;
// Timeout associated with measurement
- scheduler::EventId m_measurementExpirationId;
+ scheduler::ScopedEventId m_measurementExpiration;
+ friend class NamespaceInfo;
// RTO associated with Interest
- scheduler::EventId m_timeoutEventId;
- bool m_isTimeoutScheduled;
- size_t m_nSilentTimeouts;
+ scheduler::ScopedEventId m_timeoutEvent;
};
-typedef std::unordered_map<FaceId, FaceInfo> FaceInfoTable;
-
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
-/** \brief stores strategy information about each face in this namespace
+/** \brief Stores strategy information about each face in this namespace
*/
class NamespaceInfo : public StrategyInfo
{
public:
- NamespaceInfo();
-
static constexpr int
getTypeId()
{
return 1030;
}
- FaceInfo&
- getOrCreateFaceInfo(const fib::Entry& fibEntry, FaceId faceId);
+ explicit
+ NamespaceInfo(shared_ptr<const ndn::util::RttEstimator::Options> opts)
+ : m_rttEstimatorOpts(std::move(opts))
+ {
+ }
FaceInfo*
- getFaceInfo(const fib::Entry& fibEntry, FaceId faceId);
+ getFaceInfo(FaceId faceId);
- void
- expireFaceInfo(FaceId faceId);
+ FaceInfo&
+ getOrCreateFaceInfo(FaceId faceId);
void
extendFaceInfoLifetime(FaceInfo& info, FaceId faceId);
- FaceInfo*
- get(FaceId faceId)
- {
- if (m_fit.find(faceId) != m_fit.end()) {
- return &m_fit.at(faceId);
- }
- else {
- return nullptr;
- }
- }
-
- FaceInfoTable::iterator
- find(FaceId faceId)
- {
- return m_fit.find(faceId);
- }
-
- FaceInfoTable::iterator
- end()
- {
- return m_fit.end();
- }
-
- FaceInfoTable::iterator
- insert(FaceId faceId)
- {
- return m_fit.emplace(std::piecewise_construct,
- std::forward_as_tuple(faceId),
- std::forward_as_tuple()).first;
- }
-
bool
isProbingDue() const
{
@@ -269,20 +164,20 @@
bool
isFirstProbeScheduled() const
{
- return m_hasFirstProbeBeenScheduled;
+ return m_isFirstProbeScheduled;
}
void
- setHasFirstProbeBeenScheduled(bool hasBeenScheduled)
+ setIsFirstProbeScheduled(bool isScheduled)
{
- m_hasFirstProbeBeenScheduled = hasBeenScheduled;
+ m_isFirstProbeScheduled = isScheduled;
}
private:
- FaceInfoTable m_fit;
-
- bool m_isProbingDue;
- bool m_hasFirstProbeBeenScheduled;
+ std::unordered_map<FaceId, FaceInfo> m_fiMap;
+ shared_ptr<const ndn::util::RttEstimator::Options> m_rttEstimatorOpts;
+ bool m_isProbingDue = false;
+ bool m_isFirstProbeScheduled = false;
};
////////////////////////////////////////////////////////////////////////////////
@@ -317,6 +212,7 @@
private:
MeasurementsAccessor& m_measurements;
+ shared_ptr<const ndn::util::RttEstimator::Options> m_rttEstimatorOpts;
};
} // namespace asf
diff --git a/daemon/fw/asf-probing-module.cpp b/daemon/fw/asf-probing-module.cpp
index 46f1751..0b77b86 100644
--- a/daemon/fw/asf-probing-module.cpp
+++ b/daemon/fw/asf-probing-module.cpp
@@ -46,18 +46,16 @@
}
void
-ProbingModule::scheduleProbe(const fib::Entry& fibEntry, const time::milliseconds& interval)
+ProbingModule::scheduleProbe(const fib::Entry& fibEntry, time::milliseconds interval)
{
Name prefix = fibEntry.getPrefix();
// Set the probing flag for the namespace to true after passed interval of time
getScheduler().schedule(interval, [this, prefix] {
NamespaceInfo* info = m_measurements.getNamespaceInfo(prefix);
-
if (info == nullptr) {
- // fib::Entry with the passed prefix has been removed or the fib::Entry has
- // a name that is not controlled by the AsfStrategy
- return;
+ // FIB entry with the passed prefix has been removed or
+ // it has a name that is not controlled by the AsfStrategy
}
else {
info->setIsProbingDue(true);
@@ -69,16 +67,7 @@
ProbingModule::getFaceToProbe(const Face& inFace, const Interest& interest,
const fib::Entry& fibEntry, const Face& faceUsed)
{
- FaceInfoFacePairSet rankedFaces(
- [] (const auto& pairLhs, const auto& pairRhs) -> bool {
- // Sort by RTT
- // If a face has timed-out, rank it behind non-timed-out faces
- FaceInfo& lhs = *pairLhs.first;
- FaceInfo& rhs = *pairRhs.first;
-
- return (!lhs.isTimeout() && rhs.isTimeout()) ||
- (lhs.isTimeout() == rhs.isTimeout() && lhs.getSrtt() < rhs.getSrtt());
- });
+ FaceInfoFacePairSet rankedFaces;
// Put eligible faces into rankedFaces. If a face does not have an RTT measurement,
// immediately pick the face for probing
@@ -94,7 +83,7 @@
FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest, hopFace.getId());
// If no RTT has been recorded, probe this face
- if (info == nullptr || !info->hasSrttMeasurement()) {
+ if (info == nullptr || info->getLastRtt() == FaceInfo::RTT_NO_MEASUREMENT) {
return &hopFace;
}
@@ -107,7 +96,7 @@
return nullptr;
}
- return getFaceBasedOnProbability(rankedFaces);
+ return chooseFace(rankedFaces);
}
bool
@@ -119,10 +108,10 @@
// If a first probe has not been scheduled for a namespace
if (!info.isFirstProbeScheduled()) {
// Schedule first probe between 0 and 5 seconds
- uint64_t interval = getRandomNumber(0, 5000);
+ static std::uniform_int_distribution<> randDist(0, 5000);
+ auto interval = randDist(ndn::random::getRandomNumberEngine());
scheduleProbe(fibEntry, time::milliseconds(interval));
-
- info.setHasFirstProbeBeenScheduled(true);
+ info.setIsFirstProbeScheduled(true);
}
return info.isProbingDue();
@@ -140,10 +129,11 @@
}
Face*
-ProbingModule::getFaceBasedOnProbability(const FaceInfoFacePairSet& rankedFaces)
+ProbingModule::chooseFace(const FaceInfoFacePairSet& rankedFaces)
{
- double randomNumber = getRandomNumber(0, 1);
- uint64_t rankSum = ((rankedFaces.size() + 1) * rankedFaces.size()) / 2;
+ static std::uniform_real_distribution<> randDist;
+ double randomNumber = randDist(ndn::random::getRandomNumberEngine());
+ uint64_t rankSum = (rankedFaces.size() + 1) * rankedFaces.size() / 2;
uint64_t rank = 1;
double offset = 0.0;
@@ -164,13 +154,11 @@
// Found face to probe
return pair.second;
}
-
offset += probability;
}
// Given a set of Faces, this method should always select a Face to probe
- BOOST_ASSERT(false);
- return nullptr;
+ NDN_CXX_UNREACHABLE;
}
double
@@ -182,13 +170,6 @@
return static_cast<double>(nFaces + 1 - rank) / rankSum;
}
-double
-ProbingModule::getRandomNumber(double start, double end)
-{
- std::uniform_real_distribution<double> dist(start, end);
- return dist(ndn::random::getRandomNumberEngine());
-}
-
void
ProbingModule::setProbingInterval(size_t probingInterval)
{
@@ -196,7 +177,7 @@
m_probingInterval = time::milliseconds(probingInterval);
}
else {
- NDN_THROW(std::invalid_argument("Probing interval should be >= " +
+ NDN_THROW(std::invalid_argument("Probing interval must be >= " +
to_string(MIN_PROBING_INTERVAL.count()) + " milliseconds"));
}
}
diff --git a/daemon/fw/asf-probing-module.hpp b/daemon/fw/asf-probing-module.hpp
index 02ef155..7102bcc 100644
--- a/daemon/fw/asf-probing-module.hpp
+++ b/daemon/fw/asf-probing-module.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2018, Regents of the University of California,
+ * Copyright (c) 2014-2019, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -41,13 +41,11 @@
ProbingModule(AsfMeasurements& measurements);
void
- scheduleProbe(const fib::Entry& fibEntry, const time::milliseconds& interval);
+ scheduleProbe(const fib::Entry& fibEntry, time::milliseconds interval);
Face*
- getFaceToProbe(const Face& inFace,
- const Interest& interest,
- const fib::Entry& fibEntry,
- const Face& faceUsed);
+ getFaceToProbe(const Face& inFace, const Interest& interest,
+ const fib::Entry& fibEntry, const Face& faceUsed);
bool
isProbingNeeded(const fib::Entry& fibEntry, const Interest& interest);
@@ -66,19 +64,30 @@
private:
// Used to associate FaceInfo with the face in a NextHop
- typedef std::pair<FaceInfo*, Face*> FaceInfoFacePair;
- typedef std::function<bool(FaceInfoFacePair, FaceInfoFacePair)> FaceInfoPredicate;
- typedef std::set<FaceInfoFacePair, FaceInfoPredicate> FaceInfoFacePairSet;
+ using FaceInfoFacePair = std::pair<FaceInfo*, Face*>;
- Face*
- getFaceBasedOnProbability(const FaceInfoFacePairSet& rankedFaces);
+ struct FaceInfoCompare
+ {
+ bool
+ operator()(const FaceInfoFacePair& leftPair, const FaceInfoFacePair& rightPair) const
+ {
+ const FaceInfo& lhs = *leftPair.first;
+ const FaceInfo& rhs = *rightPair.first;
- double
+ // Sort by RTT: if a face has timed-out, rank it behind non-timed-out faces
+ return (!lhs.hasTimeout() && rhs.hasTimeout()) ||
+ (lhs.hasTimeout() == rhs.hasTimeout() && lhs.getSrtt() < rhs.getSrtt());
+ }
+ };
+
+ using FaceInfoFacePairSet = std::set<FaceInfoFacePair, FaceInfoCompare>;
+
+ static Face*
+ chooseFace(const FaceInfoFacePairSet& rankedFaces);
+
+ static double
getProbingProbability(uint64_t rank, uint64_t rankSum, uint64_t nFaces);
- double
- getRandomNumber(double start, double end);
-
public:
static constexpr time::milliseconds DEFAULT_PROBING_INTERVAL = 1_min;
static constexpr time::milliseconds MIN_PROBING_INTERVAL = 1_s;
diff --git a/daemon/fw/asf-strategy.cpp b/daemon/fw/asf-strategy.cpp
index 4e26817..62336eb 100644
--- a/daemon/fw/asf-strategy.cpp
+++ b/daemon/fw/asf-strategy.cpp
@@ -42,7 +42,6 @@
: Strategy(forwarder)
, m_measurements(getMeasurements())
, m_probing(m_measurements)
- , m_maxSilentTimeouts(0)
, m_retxSuppression(RETX_SUPPRESSION_INITIAL,
RetxSuppressionExponential::DEFAULT_MULTIPLIER,
RETX_SUPPRESSION_MAX)
@@ -58,8 +57,8 @@
}
this->setInstanceName(makeInstanceName(name, getStrategyName()));
- NFD_LOG_DEBUG("Probing interval=" << m_probing.getProbingInterval()
- << ", Num silent timeouts=" << m_maxSilentTimeouts);
+ NFD_LOG_DEBUG("probing-interval=" << m_probing.getProbingInterval()
+ << " n-silent-timeouts=" << m_maxSilentTimeouts);
}
const Name&
@@ -69,6 +68,20 @@
return strategyName;
}
+static uint64_t
+getParamValue(const std::string& param, const std::string& value)
+{
+ try {
+ if (!value.empty() && value[0] == '-')
+ NDN_THROW(boost::bad_lexical_cast());
+
+ return boost::lexical_cast<uint64_t>(value);
+ }
+ catch (const boost::bad_lexical_cast&) {
+ NDN_THROW(std::invalid_argument("Value of " + param + " must be a non-negative integer"));
+ }
+}
+
void
AsfStrategy::processParams(const PartialName& parsed)
{
@@ -93,40 +106,14 @@
}
}
-uint64_t
-AsfStrategy::getParamValue(const std::string& param, const std::string& value)
-{
- try {
- if (!value.empty() && value[0] == '-')
- NDN_THROW(boost::bad_lexical_cast());
-
- return boost::lexical_cast<uint64_t>(value);
- }
- catch (const boost::bad_lexical_cast&) {
- NDN_THROW(std::invalid_argument("Value of " + param + " must be a non-negative integer"));
- }
-}
-
-void
-AsfStrategy::sendAsfProbe(const FaceEndpoint& ingress, const Interest& interest,
- const shared_ptr<pit::Entry>& pitEntry, const Face& faceToUse,
- const fib::Entry& fibEntry)
-{
- Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
- if (faceToProbe != nullptr) {
- forwardInterest(interest, fibEntry, pitEntry, *faceToProbe, true);
- m_probing.afterForwardingProbe(fibEntry, interest);
- }
-}
-
void
AsfStrategy::afterReceiveInterest(const FaceEndpoint& ingress, const Interest& interest,
const shared_ptr<pit::Entry>& pitEntry)
{
// Should the Interest be suppressed?
- RetxSuppressionResult suppressResult = m_retxSuppression.decidePerPitEntry(*pitEntry);
+ auto suppressResult = m_retxSuppression.decidePerPitEntry(*pitEntry);
if (suppressResult == RetxSuppressionResult::SUPPRESS) {
- NFD_LOG_DEBUG(interest << " from=" << ingress << " suppressed");
+ NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " suppressed");
return;
}
@@ -135,36 +122,31 @@
if (suppressResult == RetxSuppressionResult::NEW) {
if (nexthops.size() == 0) {
- // send noRouteNack if nexthop is not available
- sendNoRouteNack(ingress, interest, pitEntry);
- this->rejectPendingInterest(pitEntry);
+ NFD_LOG_DEBUG(interest << " new-interest from=" << ingress << " no-nexthop");
+ sendNoRouteNack(ingress, pitEntry);
return;
}
- Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry);
+ Face* faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry);
+ if (faceToUse != nullptr) {
+ NFD_LOG_DEBUG(interest << " new-interest from=" << ingress << " forward-to=" << faceToUse->getId());
+ forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
- if (faceToUse == nullptr) {
- sendNoRouteNack(ingress, interest, pitEntry);
- this->rejectPendingInterest(pitEntry);
- return;
+ // If necessary, send probe
+ sendProbe(interest, ingress, *faceToUse, fibEntry, pitEntry);
}
-
- NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
- forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
-
- // If necessary, send probe
- if (m_probing.isProbingNeeded(fibEntry, interest)) {
- sendAsfProbe(ingress, interest, pitEntry, *faceToUse, fibEntry);
+ else {
+ NFD_LOG_DEBUG(interest << " new-interest from=" << ingress << " no-nexthop");
+ sendNoRouteNack(ingress, pitEntry);
}
return;
}
- Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry, false);
+ Face* faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry, false);
// if unused face not found, select nexthop with earliest out record
if (faceToUse != nullptr) {
-
- NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
- forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
+ NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " forward-to=" << faceToUse->getId());
+ forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
// avoid probing in case of forwarding
return;
}
@@ -173,11 +155,11 @@
auto it = nexthops.end();
it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
if (it == nexthops.end()) {
- NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmitNoNextHop");
+ NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " no-nexthop");
}
else {
auto egress = FaceEndpoint(it->getFace(), 0);
- NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmit-retry-to=" << egress);
+ NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " retry-to=" << egress);
this->sendInterest(pitEntry, egress, interest);
}
}
@@ -187,52 +169,52 @@
const FaceEndpoint& ingress, const Data& data)
{
NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
-
if (namespaceInfo == nullptr) {
- NFD_LOG_TRACE("Could not find measurements entry for " << pitEntry->getName());
+ NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-measurements");
return;
}
// Record the RTT between the Interest out to Data in
- FaceInfo* faceInfo = namespaceInfo->get(ingress.face.getId());
+ FaceInfo* faceInfo = namespaceInfo->getFaceInfo(ingress.face.getId());
if (faceInfo == nullptr) {
+ NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-face-info");
return;
}
- faceInfo->recordRtt(pitEntry, ingress.face);
+
+ auto outRecord = pitEntry->getOutRecord(ingress.face, 0);
+ if (outRecord == pitEntry->out_end()) {
+ NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-out-record");
+ }
+ else {
+ faceInfo->recordRtt(time::steady_clock::now() - outRecord->getLastRenewed());
+ NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress
+ << " rtt=" << faceInfo->getLastRtt() << " srtt=" << faceInfo->getSrtt());
+ }
// Extend lifetime for measurements associated with Face
namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
- if (faceInfo->isTimeoutScheduled()) {
- faceInfo->cancelTimeoutEvent(data.getName());
- }
+ faceInfo->cancelTimeout(data.getName());
}
void
AsfStrategy::afterReceiveNack(const FaceEndpoint& ingress, const lp::Nack& nack,
const shared_ptr<pit::Entry>& pitEntry)
{
- NFD_LOG_DEBUG("Nack for " << nack.getInterest() << " from=" << ingress << ": reason=" << nack.getReason());
+ NFD_LOG_DEBUG(nack.getInterest() << " nack from=" << ingress << " reason=" << nack.getReason());
onTimeout(pitEntry->getName(), ingress.face.getId());
}
-////////////////////////////////////////////////////////////////////////////////
-////////////////////////////////////////////////////////////////////////////////
-
void
-AsfStrategy::forwardInterest(const Interest& interest,
- const fib::Entry& fibEntry,
- const shared_ptr<pit::Entry>& pitEntry,
- Face& outFace,
- bool wantNewNonce)
+AsfStrategy::forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
+ const shared_ptr<pit::Entry>& pitEntry, bool wantNewNonce)
{
auto egress = FaceEndpoint(outFace, 0);
if (wantNewNonce) {
- //Send probe: interest with new Nonce
+ // Send probe: interest with new Nonce
Interest probeInterest(interest);
probeInterest.refreshNonce();
- NFD_LOG_TRACE("Sending probe for " << probeInterest << probeInterest.getNonce()
- << " to: " << egress);
+ NFD_LOG_TRACE("Sending probe for " << probeInterest << " to=" << egress);
this->sendInterest(pitEntry, egress, probeInterest);
}
else {
@@ -246,18 +228,30 @@
namespaceInfo.extendFaceInfoLifetime(faceInfo, egress.face.getId());
if (!faceInfo.isTimeoutScheduled()) {
- // Estimate and schedule timeout
- auto timeout = faceInfo.computeRto();
-
- NFD_LOG_TRACE("Scheduling timeout for " << fibEntry.getPrefix() << " to: " << egress
+ auto timeout = faceInfo.scheduleTimeout(interest.getName(),
+ [this, name = interest.getName(), faceId = egress.face.getId()] {
+ onTimeout(name, faceId);
+ });
+ NFD_LOG_TRACE("Scheduled timeout for " << fibEntry.getPrefix() << " to=" << egress
<< " in " << time::duration_cast<time::milliseconds>(timeout) << " ms");
-
- auto id = getScheduler().schedule(timeout, bind(&AsfStrategy::onTimeout, this,
- interest.getName(), egress.face.getId()));
- faceInfo.setTimeoutEvent(id, interest.getName());
}
}
+void
+AsfStrategy::sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
+ const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry)
+{
+ if (!m_probing.isProbingNeeded(fibEntry, interest))
+ return;
+
+ Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
+ if (faceToProbe == nullptr)
+ return;
+
+ forwardInterest(interest, *faceToProbe, fibEntry, pitEntry, true);
+ m_probing.afterForwardingProbe(fibEntry, interest);
+}
+
struct FaceStats
{
Face* face;
@@ -266,131 +260,101 @@
uint64_t cost;
};
-time::nanoseconds
-getValueForSorting(const FaceStats& stats)
+struct FaceStatsCompare
{
- // These values allow faces with no measurements to be ranked better than timeouts
- // srtt < RTT_NO_MEASUREMENT < RTT_TIMEOUT
- static const time::nanoseconds SORTING_RTT_TIMEOUT = time::nanoseconds::max();
- static const time::nanoseconds SORTING_RTT_NO_MEASUREMENT = SORTING_RTT_TIMEOUT / 2;
+ bool
+ operator()(const FaceStats& lhs, const FaceStats& rhs) const
+ {
+ time::nanoseconds lhsValue = getValueForSorting(lhs);
+ time::nanoseconds rhsValue = getValueForSorting(rhs);
- if (stats.rtt == RttStats::RTT_TIMEOUT) {
- return SORTING_RTT_TIMEOUT;
+ // Sort by RTT and then by cost
+ return std::tie(lhsValue, lhs.cost) < std::tie(rhsValue, rhs.cost);
}
- else if (stats.rtt == RttStats::RTT_NO_MEASUREMENT) {
- return SORTING_RTT_NO_MEASUREMENT;
+
+private:
+ static time::nanoseconds
+ getValueForSorting(const FaceStats& stats)
+ {
+ // These values allow faces with no measurements to be ranked better than timeouts
+ // srtt < RTT_NO_MEASUREMENT < RTT_TIMEOUT
+ if (stats.rtt == FaceInfo::RTT_TIMEOUT) {
+ return time::nanoseconds::max();
+ }
+ else if (stats.rtt == FaceInfo::RTT_NO_MEASUREMENT) {
+ return time::nanoseconds::max() / 2;
+ }
+ else {
+ return stats.srtt;
+ }
}
- else {
- return stats.srtt;
- }
-}
+};
Face*
-AsfStrategy::getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest,
- const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
+AsfStrategy::getBestFaceForForwarding(const Interest& interest, const Face& inFace,
+ const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
bool isInterestNew)
{
- NFD_LOG_TRACE("Looking for best face for " << fibEntry.getPrefix());
-
- typedef std::function<bool(const FaceStats&, const FaceStats&)> FaceStatsPredicate;
- typedef std::set<FaceStats, FaceStatsPredicate> FaceStatsSet;
-
- FaceStatsSet rankedFaces(
- [] (const FaceStats& lhs, const FaceStats& rhs) -> bool {
- // Sort by RTT and then by cost
- time::nanoseconds lhsValue = getValueForSorting(lhs);
- time::nanoseconds rhsValue = getValueForSorting(rhs);
-
- if (lhsValue < rhsValue) {
- return true;
- }
- else if (lhsValue == rhsValue) {
- return lhs.cost < rhs.cost;
- }
- else {
- return false;
- }
- });
+ std::set<FaceStats, FaceStatsCompare> rankedFaces;
auto now = time::steady_clock::now();
- for (const fib::NextHop& hop : fibEntry.getNextHops()) {
- Face& hopFace = hop.getFace();
-
- if (!isNextHopEligible(inFace, interest, hop, pitEntry, !isInterestNew, now)) {
+ for (const auto& nh : fibEntry.getNextHops()) {
+ if (!isNextHopEligible(inFace, interest, nh, pitEntry, !isInterestNew, now)) {
continue;
}
- FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest, hopFace.getId());
-
+ FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest, nh.getFace().getId());
if (info == nullptr) {
- FaceStats stats = {&hopFace,
- RttStats::RTT_NO_MEASUREMENT,
- RttStats::RTT_NO_MEASUREMENT,
- hop.getCost()};
-
- rankedFaces.insert(stats);
+ rankedFaces.insert({&nh.getFace(), FaceInfo::RTT_NO_MEASUREMENT,
+ FaceInfo::RTT_NO_MEASUREMENT, nh.getCost()});
}
else {
- FaceStats stats = {&hopFace, info->getRtt(), info->getSrtt(), hop.getCost()};
- rankedFaces.insert(stats);
+ rankedFaces.insert({&nh.getFace(), info->getLastRtt(), info->getSrtt(), nh.getCost()});
}
}
- FaceStatsSet::iterator it = rankedFaces.begin();
-
- if (it != rankedFaces.end()) {
- return it->face;
- }
- else {
- return nullptr;
- }
+ auto it = rankedFaces.begin();
+ return it != rankedFaces.end() ? it->face : nullptr;
}
void
-AsfStrategy::onTimeout(const Name& interestName, const face::FaceId faceId)
+AsfStrategy::onTimeout(const Name& interestName, FaceId faceId)
{
NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
-
if (namespaceInfo == nullptr) {
- NFD_LOG_TRACE("FibEntry for " << interestName << " has been removed since timeout scheduling");
+ NFD_LOG_TRACE(interestName << " FibEntry has been removed since timeout scheduling");
return;
}
- FaceInfoTable::iterator it = namespaceInfo->find(faceId);
-
- if (it == namespaceInfo->end()) {
- it = namespaceInfo->insert(faceId);
+ FaceInfo* fiPtr = namespaceInfo->getFaceInfo(faceId);
+ if (fiPtr == nullptr) {
+ NFD_LOG_TRACE(interestName << " FaceInfo id=" << faceId << " has been removed since timeout scheduling");
+ return;
}
- FaceInfo& faceInfo = it->second;
+ auto& faceInfo = *fiPtr;
+ size_t nTimeouts = faceInfo.getNSilentTimeouts() + 1;
+ faceInfo.setNSilentTimeouts(nTimeouts);
- faceInfo.setNSilentTimeouts(faceInfo.getNSilentTimeouts() + 1);
-
- if (faceInfo.getNSilentTimeouts() <= m_maxSilentTimeouts) {
- NFD_LOG_TRACE("FaceId " << faceId << " for " << interestName << " has timed-out "
- << faceInfo.getNSilentTimeouts() << " time(s), ignoring");
+ if (nTimeouts <= m_maxSilentTimeouts) {
+ NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts << " ignoring");
// Extend lifetime for measurements associated with Face
namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
-
- if (faceInfo.isTimeoutScheduled()) {
- faceInfo.cancelTimeoutEvent(interestName);
- }
+ faceInfo.cancelTimeout(interestName);
}
else {
- NFD_LOG_TRACE("FaceId " << faceId << " for " << interestName << " has timed-out");
+ NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts);
faceInfo.recordTimeout(interestName);
}
}
void
-AsfStrategy::sendNoRouteNack(const FaceEndpoint& ingress, const Interest& interest,
- const shared_ptr<pit::Entry>& pitEntry)
+AsfStrategy::sendNoRouteNack(const FaceEndpoint& ingress, const shared_ptr<pit::Entry>& pitEntry)
{
- NFD_LOG_DEBUG(interest << " from=" << ingress << " noNextHop");
-
lp::NackHeader nackHeader;
nackHeader.setReason(lp::NackReason::NO_ROUTE);
this->sendNack(pitEntry, ingress, nackHeader);
+ this->rejectPendingInterest(pitEntry);
}
} // namespace asf
diff --git a/daemon/fw/asf-strategy.hpp b/daemon/fw/asf-strategy.hpp
index a181676..cba941e 100644
--- a/daemon/fw/asf-strategy.hpp
+++ b/daemon/fw/asf-strategy.hpp
@@ -67,41 +67,32 @@
private:
void
- forwardInterest(const Interest& interest,
- const fib::Entry& fibEntry,
- const shared_ptr<pit::Entry>& pitEntry,
- Face& outFace,
- bool wantNewNonce = false);
+ processParams(const PartialName& parsed);
void
- sendAsfProbe(const FaceEndpoint& ingress, const Interest& interest,
- const shared_ptr<pit::Entry>& pitEntry, const Face& faceToUse,
- const fib::Entry& fibEntry);
+ forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
+ const shared_ptr<pit::Entry>& pitEntry, bool wantNewNonce = false);
+
+ void
+ sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
+ const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry);
Face*
- getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest,
- const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
+ getBestFaceForForwarding(const Interest& interest, const Face& inFace,
+ const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
bool isNewInterest = true);
void
- onTimeout(const Name& interestName, const FaceId faceId);
+ onTimeout(const Name& interestName, FaceId faceId);
void
- sendNoRouteNack(const FaceEndpoint& ingress, const Interest& interest, const shared_ptr<pit::Entry>& pitEntry);
-
- void
- processParams(const PartialName& parsed);
-
- static uint64_t
- getParamValue(const std::string& param, const std::string& value);
+ sendNoRouteNack(const FaceEndpoint& ingress, const shared_ptr<pit::Entry>& pitEntry);
private:
AsfMeasurements m_measurements;
ProbingModule m_probing;
- size_t m_maxSilentTimeouts;
-
-private:
RetxSuppressionExponential m_retxSuppression;
+ size_t m_maxSilentTimeouts = 0;
static const time::milliseconds RETX_SUPPRESSION_INITIAL;
static const time::milliseconds RETX_SUPPRESSION_MAX;