face: congestion detection in TCP, UDP, and Unix socket transports
refs #4362
Change-Id: Idaa5d65e1f33663d95bad56de42640183b2cda6d
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
index 2a57d68..0adbf26 100644
--- a/daemon/face/stream-transport.hpp
+++ b/daemon/face/stream-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -27,6 +27,7 @@
#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
#include "transport.hpp"
+#include "socket-utils.hpp"
#include "core/global-io.hpp"
#include <queue>
@@ -51,6 +52,9 @@
explicit
StreamTransport(typename protocol::socket&& socket);
+ ssize_t
+ getSendQueueLength() override;
+
protected:
void
doClose() override;
@@ -87,6 +91,9 @@
void
resetSendQueue();
+ size_t
+ getSendQueueBytes() const;
+
protected:
typename protocol::socket m_socket;
@@ -96,6 +103,7 @@
uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
size_t m_receiveBufferSize;
std::queue<Block> m_sendQueue;
+ size_t m_sendQueueBytes;
};
@@ -103,11 +111,27 @@
StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
: m_socket(std::move(socket))
, m_receiveBufferSize(0)
+ , m_sendQueueBytes(0)
{
+ // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
+ // Therefore, protecting against send queue overflows is less critical than in other transport
+ // types. Instead, we use the default threshold specified in the GenericLinkService options.
+
startReceive();
}
template<class T>
+ssize_t
+StreamTransport<T>::getSendQueueLength()
+{
+ ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
+ if (queueLength == QUEUE_ERROR) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+ }
+ return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
+}
+
+template<class T>
void
StreamTransport<T>::doClose()
{
@@ -165,6 +189,7 @@
bool wasQueueEmpty = m_sendQueue.empty();
m_sendQueue.push(packet.packet);
+ m_sendQueueBytes += packet.packet.size();
if (wasQueueEmpty)
sendFromQueue();
@@ -191,6 +216,8 @@
NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
BOOST_ASSERT(!m_sendQueue.empty());
+ BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
+ m_sendQueueBytes -= nBytesSent;
m_sendQueue.pop();
if (!m_sendQueue.empty())
@@ -299,6 +326,14 @@
{
std::queue<Block> emptyQueue;
std::swap(emptyQueue, m_sendQueue);
+ m_sendQueueBytes = 0;
+}
+
+template<class T>
+size_t
+StreamTransport<T>::getSendQueueBytes() const
+{
+ return m_sendQueueBytes;
}
} // namespace face