face: congestion detection in TCP, UDP, and Unix socket transports

refs #4362

Change-Id: Idaa5d65e1f33663d95bad56de42640183b2cda6d
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
index 2a57d68..0adbf26 100644
--- a/daemon/face/stream-transport.hpp
+++ b/daemon/face/stream-transport.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017,  Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -27,6 +27,7 @@
 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
 
 #include "transport.hpp"
+#include "socket-utils.hpp"
 #include "core/global-io.hpp"
 
 #include <queue>
@@ -51,6 +52,9 @@
   explicit
   StreamTransport(typename protocol::socket&& socket);
 
+  ssize_t
+  getSendQueueLength() override;
+
 protected:
   void
   doClose() override;
@@ -87,6 +91,9 @@
   void
   resetSendQueue();
 
+  size_t
+  getSendQueueBytes() const;
+
 protected:
   typename protocol::socket m_socket;
 
@@ -96,6 +103,7 @@
   uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
   size_t m_receiveBufferSize;
   std::queue<Block> m_sendQueue;
+  size_t m_sendQueueBytes;
 };
 
 
@@ -103,11 +111,27 @@
 StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
   : m_socket(std::move(socket))
   , m_receiveBufferSize(0)
+  , m_sendQueueBytes(0)
 {
+  // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
+  // Therefore, protecting against send queue overflows is less critical than in other transport
+  // types. Instead, we use the default threshold specified in the GenericLinkService options.
+
   startReceive();
 }
 
 template<class T>
+ssize_t
+StreamTransport<T>::getSendQueueLength()
+{
+  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
+  if (queueLength == QUEUE_ERROR) {
+    NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+  }
+  return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
+}
+
+template<class T>
 void
 StreamTransport<T>::doClose()
 {
@@ -165,6 +189,7 @@
 
   bool wasQueueEmpty = m_sendQueue.empty();
   m_sendQueue.push(packet.packet);
+  m_sendQueueBytes += packet.packet.size();
 
   if (wasQueueEmpty)
     sendFromQueue();
@@ -191,6 +216,8 @@
   NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
 
   BOOST_ASSERT(!m_sendQueue.empty());
+  BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
+  m_sendQueueBytes -= nBytesSent;
   m_sendQueue.pop();
 
   if (!m_sendQueue.empty())
@@ -299,6 +326,14 @@
 {
   std::queue<Block> emptyQueue;
   std::swap(emptyQueue, m_sendQueue);
+  m_sendQueueBytes = 0;
+}
+
+template<class T>
+size_t
+StreamTransport<T>::getSendQueueBytes() const
+{
+  return m_sendQueueBytes;
 }
 
 } // namespace face