face: congestion detection in TCP, UDP, and Unix socket transports
refs #4362
Change-Id: Idaa5d65e1f33663d95bad56de42640183b2cda6d
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index bbd8e03..c634484 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -24,17 +24,25 @@
*/
#include "generic-link-service.hpp"
+
#include <ndn-cxx/lp/tags.hpp>
+#include <cmath>
+
namespace nfd {
namespace face {
NFD_LOG_INIT("GenericLinkService");
+constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
+
GenericLinkService::Options::Options()
: allowLocalFields(false)
, allowFragmentation(false)
, allowReassembly(false)
+ , allowCongestionMarking(false)
+ , baseCongestionMarkingInterval(time::milliseconds(100)) // Interval from RFC 8289 (CoDel)
+ , defaultCongestionThreshold(65536) // This default value works well for a queue capacity of 200KiB
{
}
@@ -44,6 +52,9 @@
, m_reassembler(m_options.reassemblerOptions, this)
, m_reliability(m_options.reliabilityOptions, this)
, m_lastSeqNo(-2)
+ , m_nextMarkTime(time::steady_clock::TimePoint::max())
+ , m_lastMarkTime(time::steady_clock::TimePoint::min())
+ , m_nMarkedSinceInMarkingState(0)
{
m_reassembler.beforeTimeout.connect(bind([this] { ++this->nReassemblyTimeouts; }));
m_reliability.onDroppedInterest.connect([this] (const Interest& i) { this->notifyDroppedInterest(i); });
@@ -71,10 +82,15 @@
GenericLinkService::sendLpPacket(lp::Packet&& pkt)
{
const ssize_t mtu = this->getTransport()->getMtu();
+
if (m_options.reliabilityOptions.isEnabled) {
m_reliability.piggyback(pkt, mtu);
}
+ if (m_options.allowCongestionMarking) {
+ checkCongestionLevel(pkt);
+ }
+
Transport::Packet tp(pkt.wireEncode());
if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
++this->nOutOverMtu;
@@ -140,9 +156,14 @@
// Make space for feature fields in fragments
if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
mtu -= LpReliability::RESERVED_HEADER_SPACE;
- BOOST_ASSERT(mtu > 0);
}
+ if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
+ mtu -= CONGESTION_MARK_SIZE;
+ }
+
+ BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
+
if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
bool isOk = false;
std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
@@ -196,6 +217,59 @@
}
void
+GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
+{
+ ssize_t sendQueueLength = getTransport()->getSendQueueLength();
+ // This operation requires that the transport supports retrieving current send queue length
+ if (sendQueueLength < 0) {
+ return;
+ }
+
+ // To avoid overflowing the queue, set the congestion threshold to at least half of the send
+ // queue capacity.
+ size_t congestionThreshold = m_options.defaultCongestionThreshold;
+ if (getTransport()->getSendQueueCapacity() >= 0) {
+ congestionThreshold = std::min(congestionThreshold,
+ static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
+ DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
+ }
+
+ if (sendQueueLength > 0) {
+ NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
+ " capacity=" << getTransport()->getSendQueueCapacity());
+ }
+
+ if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
+ const auto now = time::steady_clock::now();
+ if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
+ // Mark at most one initial packet per baseCongestionMarkingInterval
+ if (m_nMarkedSinceInMarkingState == 0) {
+ m_nextMarkTime = now;
+ }
+
+ // Time to mark packet
+ pkt.set<lp::CongestionMarkField>(1);
+ ++nCongestionMarked;
+ NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
+
+ ++m_nMarkedSinceInMarkingState;
+ // Decrease the marking interval by the inverse of the square root of the number of packets
+ // marked in this incident of congestion
+ m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
+ m_options.baseCongestionMarkingInterval.count() /
+ std::sqrt(m_nMarkedSinceInMarkingState)));
+ m_lastMarkTime = now;
+ }
+ }
+ else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
+ // Congestion incident has ended, so reset
+ NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
+ m_nextMarkTime = time::steady_clock::TimePoint::max();
+ m_nMarkedSinceInMarkingState = 0;
+ }
+}
+
+void
GenericLinkService::doReceivePacket(Transport::Packet&& packet)
{
try {