face: congestion detection in TCP, UDP, and Unix socket transports

refs #4362

Change-Id: Idaa5d65e1f33663d95bad56de42640183b2cda6d
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index bbd8e03..c634484 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2014-2017,  Regents of the University of California,
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -24,17 +24,25 @@
  */
 
 #include "generic-link-service.hpp"
+
 #include <ndn-cxx/lp/tags.hpp>
 
+#include <cmath>
+
 namespace nfd {
 namespace face {
 
 NFD_LOG_INIT("GenericLinkService");
 
+constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
+
 GenericLinkService::Options::Options()
   : allowLocalFields(false)
   , allowFragmentation(false)
   , allowReassembly(false)
+  , allowCongestionMarking(false)
+  , baseCongestionMarkingInterval(time::milliseconds(100)) // Interval from RFC 8289 (CoDel)
+  , defaultCongestionThreshold(65536) // This default value works well for a queue capacity of 200KiB
 {
 }
 
@@ -44,6 +52,9 @@
   , m_reassembler(m_options.reassemblerOptions, this)
   , m_reliability(m_options.reliabilityOptions, this)
   , m_lastSeqNo(-2)
+  , m_nextMarkTime(time::steady_clock::TimePoint::max())
+  , m_lastMarkTime(time::steady_clock::TimePoint::min())
+  , m_nMarkedSinceInMarkingState(0)
 {
   m_reassembler.beforeTimeout.connect(bind([this] { ++this->nReassemblyTimeouts; }));
   m_reliability.onDroppedInterest.connect([this] (const Interest& i) { this->notifyDroppedInterest(i); });
@@ -71,10 +82,15 @@
 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
 {
   const ssize_t mtu = this->getTransport()->getMtu();
+
   if (m_options.reliabilityOptions.isEnabled) {
     m_reliability.piggyback(pkt, mtu);
   }
 
+  if (m_options.allowCongestionMarking) {
+    checkCongestionLevel(pkt);
+  }
+
   Transport::Packet tp(pkt.wireEncode());
   if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
     ++this->nOutOverMtu;
@@ -140,9 +156,14 @@
   // Make space for feature fields in fragments
   if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
     mtu -= LpReliability::RESERVED_HEADER_SPACE;
-    BOOST_ASSERT(mtu > 0);
   }
 
+  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
+    mtu -= CONGESTION_MARK_SIZE;
+  }
+
+  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
+
   if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
     bool isOk = false;
     std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
@@ -196,6 +217,59 @@
 }
 
 void
+GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
+{
+  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
+  // This operation requires that the transport supports retrieving current send queue length
+  if (sendQueueLength < 0) {
+    return;
+  }
+
+  // To avoid overflowing the queue, set the congestion threshold to at least half of the send
+  // queue capacity.
+  size_t congestionThreshold = m_options.defaultCongestionThreshold;
+  if (getTransport()->getSendQueueCapacity() >= 0) {
+    congestionThreshold = std::min(congestionThreshold,
+                                   static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
+                                                       DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
+  }
+
+  if (sendQueueLength > 0) {
+    NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
+                       " capacity=" << getTransport()->getSendQueueCapacity());
+  }
+
+  if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
+    const auto now = time::steady_clock::now();
+    if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
+      // Mark at most one initial packet per baseCongestionMarkingInterval
+      if (m_nMarkedSinceInMarkingState == 0) {
+        m_nextMarkTime = now;
+      }
+
+      // Time to mark packet
+      pkt.set<lp::CongestionMarkField>(1);
+      ++nCongestionMarked;
+      NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
+
+      ++m_nMarkedSinceInMarkingState;
+      // Decrease the marking interval by the inverse of the square root of the number of packets
+      // marked in this incident of congestion
+      m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
+                                            m_options.baseCongestionMarkingInterval.count() /
+                                            std::sqrt(m_nMarkedSinceInMarkingState)));
+      m_lastMarkTime = now;
+    }
+  }
+  else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
+    // Congestion incident has ended, so reset
+    NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
+    m_nextMarkTime = time::steady_clock::TimePoint::max();
+    m_nMarkedSinceInMarkingState = 0;
+  }
+}
+
+void
 GenericLinkService::doReceivePacket(Transport::Packet&& packet)
 {
   try {