face: congestion detection in TCP, UDP, and Unix socket transports
refs #4362
Change-Id: Idaa5d65e1f33663d95bad56de42640183b2cda6d
diff --git a/daemon/face/datagram-transport.hpp b/daemon/face/datagram-transport.hpp
index 67ad0a7..2b2cc29 100644
--- a/daemon/face/datagram-transport.hpp
+++ b/daemon/face/datagram-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -27,6 +27,7 @@
#define NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
#include "transport.hpp"
+#include "socket-utils.hpp"
#include "core/global-io.hpp"
#include <array>
@@ -54,6 +55,9 @@
explicit
DatagramTransport(typename protocol::socket&& socket);
+ ssize_t
+ getSendQueueLength() override;
+
/** \brief Receive datagram, translate buffer into packet, deliver to parent class.
*/
void
@@ -104,6 +108,17 @@
: m_socket(std::move(socket))
, m_hasRecentlyReceived(false)
{
+ boost::asio::socket_base::send_buffer_size sendBufferSizeOption;
+ boost::system::error_code error;
+ m_socket.get_option(sendBufferSizeOption, error);
+ if (error) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue capacity from socket: " << error.message());
+ this->setSendQueueCapacity(QUEUE_ERROR);
+ }
+ else {
+ this->setSendQueueCapacity(sendBufferSizeOption.value());
+ }
+
m_socket.async_receive_from(boost::asio::buffer(m_receiveBuffer), m_sender,
bind(&DatagramTransport<T, U>::handleReceive, this,
boost::asio::placeholders::error,
@@ -111,6 +126,17 @@
}
template<class T, class U>
+ssize_t
+DatagramTransport<T, U>::getSendQueueLength()
+{
+ ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
+ if (queueLength == QUEUE_ERROR) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+ }
+ return queueLength;
+}
+
+template<class T, class U>
void
DatagramTransport<T, U>::doClose()
{
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index bbd8e03..c634484 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -24,17 +24,25 @@
*/
#include "generic-link-service.hpp"
+
#include <ndn-cxx/lp/tags.hpp>
+#include <cmath>
+
namespace nfd {
namespace face {
NFD_LOG_INIT("GenericLinkService");
+constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
+
GenericLinkService::Options::Options()
: allowLocalFields(false)
, allowFragmentation(false)
, allowReassembly(false)
+ , allowCongestionMarking(false)
+ , baseCongestionMarkingInterval(time::milliseconds(100)) // Interval from RFC 8289 (CoDel)
+ , defaultCongestionThreshold(65536) // This default value works well for a queue capacity of 200KiB
{
}
@@ -44,6 +52,9 @@
, m_reassembler(m_options.reassemblerOptions, this)
, m_reliability(m_options.reliabilityOptions, this)
, m_lastSeqNo(-2)
+ , m_nextMarkTime(time::steady_clock::TimePoint::max())
+ , m_lastMarkTime(time::steady_clock::TimePoint::min())
+ , m_nMarkedSinceInMarkingState(0)
{
m_reassembler.beforeTimeout.connect(bind([this] { ++this->nReassemblyTimeouts; }));
m_reliability.onDroppedInterest.connect([this] (const Interest& i) { this->notifyDroppedInterest(i); });
@@ -71,10 +82,15 @@
GenericLinkService::sendLpPacket(lp::Packet&& pkt)
{
const ssize_t mtu = this->getTransport()->getMtu();
+
if (m_options.reliabilityOptions.isEnabled) {
m_reliability.piggyback(pkt, mtu);
}
+ if (m_options.allowCongestionMarking) {
+ checkCongestionLevel(pkt);
+ }
+
Transport::Packet tp(pkt.wireEncode());
if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
++this->nOutOverMtu;
@@ -140,9 +156,14 @@
// Make space for feature fields in fragments
if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
mtu -= LpReliability::RESERVED_HEADER_SPACE;
- BOOST_ASSERT(mtu > 0);
}
+ if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
+ mtu -= CONGESTION_MARK_SIZE;
+ }
+
+ BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
+
if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
bool isOk = false;
std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
@@ -196,6 +217,59 @@
}
void
+GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
+{
+ ssize_t sendQueueLength = getTransport()->getSendQueueLength();
+ // This operation requires that the transport supports retrieving current send queue length
+ if (sendQueueLength < 0) {
+ return;
+ }
+
+ // To avoid overflowing the queue, set the congestion threshold to at least half of the send
+ // queue capacity.
+ size_t congestionThreshold = m_options.defaultCongestionThreshold;
+ if (getTransport()->getSendQueueCapacity() >= 0) {
+ congestionThreshold = std::min(congestionThreshold,
+ static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
+ DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
+ }
+
+ if (sendQueueLength > 0) {
+ NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
+ " capacity=" << getTransport()->getSendQueueCapacity());
+ }
+
+ if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
+ const auto now = time::steady_clock::now();
+ if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
+ // Mark at most one initial packet per baseCongestionMarkingInterval
+ if (m_nMarkedSinceInMarkingState == 0) {
+ m_nextMarkTime = now;
+ }
+
+ // Time to mark packet
+ pkt.set<lp::CongestionMarkField>(1);
+ ++nCongestionMarked;
+ NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
+
+ ++m_nMarkedSinceInMarkingState;
+ // Decrease the marking interval by the inverse of the square root of the number of packets
+ // marked in this incident of congestion
+ m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
+ m_options.baseCongestionMarkingInterval.count() /
+ std::sqrt(m_nMarkedSinceInMarkingState)));
+ m_lastMarkTime = now;
+ }
+ }
+ else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
+ // Congestion incident has ended, so reset
+ NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
+ m_nextMarkTime = time::steady_clock::TimePoint::max();
+ m_nMarkedSinceInMarkingState = 0;
+ }
+}
+
+void
GenericLinkService::doReceivePacket(Transport::Packet&& packet)
{
try {
diff --git a/daemon/face/generic-link-service.hpp b/daemon/face/generic-link-service.hpp
index 1823b37..d74b1f2 100644
--- a/daemon/face/generic-link-service.hpp
+++ b/daemon/face/generic-link-service.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -80,6 +80,10 @@
* of retransmissions
*/
PacketCounter nRetxExhausted;
+
+ /** \brief count of outgoing LpPackets that were marked with congestion marks
+ */
+ PacketCounter nCongestionMarked;
};
/** \brief GenericLinkService is a LinkService that implements the NDNLPv2 protocol
@@ -120,6 +124,18 @@
/** \brief options for reliability
*/
LpReliability::Options reliabilityOptions;
+
+ /** \brief enables send queue congestion detection and marking
+ */
+ bool allowCongestionMarking;
+
+ /** \brief starting value for congestion marking interval
+ */
+ time::nanoseconds baseCongestionMarkingInterval;
+
+ /** \brief default congestion threshold in bytes
+ */
+ size_t defaultCongestionThreshold;
};
/** \brief counters provided by GenericLinkService
@@ -194,6 +210,13 @@
void
assignSequences(std::vector<lp::Packet>& pkts);
+ /** \brief if the send queue is found to be congested, add a congestion mark to the packet
+ * according to CoDel
+ * \sa https://tools.ietf.org/html/rfc8289
+ */
+ void
+ checkCongestionLevel(lp::Packet& pkt);
+
private: // receive path
/** \brief receive Packet from Transport
*/
@@ -253,6 +276,16 @@
LpReliability m_reliability;
lp::Sequence m_lastSeqNo;
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ /// CongestionMark TLV-TYPE (3 octets) + CongestionMark TLV-LENGTH (1 octet) + sizeof(uint64_t)
+ static constexpr size_t CONGESTION_MARK_SIZE = 3 + 1 + sizeof(uint64_t);
+ /// Time to mark next packet due to send queue congestion
+ time::steady_clock::TimePoint m_nextMarkTime;
+ /// Time last packet was marked
+ time::steady_clock::TimePoint m_lastMarkTime;
+ /// number of marked packets in the current incident of congestion
+ size_t m_nMarkedSinceInMarkingState;
+
friend class LpReliability;
};
diff --git a/daemon/face/multicast-udp-transport.cpp b/daemon/face/multicast-udp-transport.cpp
index f8ee1a9..86324d3 100644
--- a/daemon/face/multicast-udp-transport.cpp
+++ b/daemon/face/multicast-udp-transport.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -24,6 +24,7 @@
*/
#include "multicast-udp-transport.hpp"
+#include "socket-utils.hpp"
#include "udp-protocol.hpp"
#ifdef __linux__
@@ -54,9 +55,30 @@
this->setLinkType(linkType);
this->setMtu(udp::computeMtu(localEndpoint));
+ protocol::socket::send_buffer_size sendBufferSizeOption;
+ boost::system::error_code error;
+ m_sendSocket.get_option(sendBufferSizeOption);
+ if (error) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue capacity from socket: " << error.message());
+ this->setSendQueueCapacity(QUEUE_ERROR);
+ }
+ else {
+ this->setSendQueueCapacity(sendBufferSizeOption.value());
+ }
+
NFD_LOG_FACE_INFO("Creating transport");
}
+ssize_t
+MulticastUdpTransport::getSendQueueLength()
+{
+ ssize_t queueLength = getTxQueueLength(m_sendSocket.native_handle());
+ if (queueLength == QUEUE_ERROR) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+ }
+ return queueLength;
+}
+
void
MulticastUdpTransport::doSend(Transport::Packet&& packet)
{
diff --git a/daemon/face/multicast-udp-transport.hpp b/daemon/face/multicast-udp-transport.hpp
index b0fb4d4..e655f4a 100644
--- a/daemon/face/multicast-udp-transport.hpp
+++ b/daemon/face/multicast-udp-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -71,6 +71,9 @@
protocol::socket&& sendSocket,
ndn::nfd::LinkType linkType);
+ ssize_t
+ getSendQueueLength() final;
+
static void
openRxSocket(protocol::socket& sock,
const protocol::endpoint& multicastGroup,
diff --git a/daemon/face/socket-utils.cpp b/daemon/face/socket-utils.cpp
new file mode 100644
index 0000000..5b38e40
--- /dev/null
+++ b/daemon/face/socket-utils.cpp
@@ -0,0 +1,57 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "socket-utils.hpp"
+#include "transport.hpp"
+
+#if defined(__linux__)
+#include <linux/sockios.h>
+#include <sys/ioctl.h>
+#elif defined(__APPLE__)
+#include <sys/socket.h>
+#endif
+
+namespace nfd {
+namespace face {
+
+ssize_t
+getTxQueueLength(int fd)
+{
+ int queueLength = QUEUE_UNSUPPORTED;
+#if defined(__linux__)
+ if (ioctl(fd, SIOCOUTQ, &queueLength) < 0) {
+ queueLength = QUEUE_ERROR;
+ }
+#elif defined(__APPLE__)
+ socklen_t queueLengthSize = sizeof(queueLength);
+ if (getsockopt(fd, SOL_SOCKET, SO_NWRITE, &queueLength, &queueLengthSize) < 0) {
+ queueLength = QUEUE_ERROR;
+ }
+#endif
+ return queueLength;
+}
+
+} // namespace face
+} // namespace nfd
diff --git a/daemon/face/socket-utils.hpp b/daemon/face/socket-utils.hpp
new file mode 100644
index 0000000..3929d8a
--- /dev/null
+++ b/daemon/face/socket-utils.hpp
@@ -0,0 +1,48 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_DAEMON_FACE_SOCKET_UTILS_HPP
+#define NFD_DAEMON_FACE_SOCKET_UTILS_HPP
+
+#include "core/common.hpp"
+
+namespace nfd {
+namespace face {
+
+/** \brief obtain send queue length from a specified system socket
+ * \param fd file descriptor of the socket
+ * \retval QUEUE_UNSUPPORTED this operation is unsupported on the current platform
+ * \retval QUEUE_ERROR there was an error retrieving the send queue length
+ *
+ * On Linux, ioctl() is used to obtain the value of SIOCOUTQ from the socket.
+ * On macOS, getsockopt() is used to obtain the value of SO_NWRITE from the socket.
+ */
+ssize_t
+getTxQueueLength(int fd);
+
+} // namespace face
+} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_SOCKET_UTILS_HPP
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
index 2a57d68..0adbf26 100644
--- a/daemon/face/stream-transport.hpp
+++ b/daemon/face/stream-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -27,6 +27,7 @@
#define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
#include "transport.hpp"
+#include "socket-utils.hpp"
#include "core/global-io.hpp"
#include <queue>
@@ -51,6 +52,9 @@
explicit
StreamTransport(typename protocol::socket&& socket);
+ ssize_t
+ getSendQueueLength() override;
+
protected:
void
doClose() override;
@@ -87,6 +91,9 @@
void
resetSendQueue();
+ size_t
+ getSendQueueBytes() const;
+
protected:
typename protocol::socket m_socket;
@@ -96,6 +103,7 @@
uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
size_t m_receiveBufferSize;
std::queue<Block> m_sendQueue;
+ size_t m_sendQueueBytes;
};
@@ -103,11 +111,27 @@
StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
: m_socket(std::move(socket))
, m_receiveBufferSize(0)
+ , m_sendQueueBytes(0)
{
+ // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
+ // Therefore, protecting against send queue overflows is less critical than in other transport
+ // types. Instead, we use the default threshold specified in the GenericLinkService options.
+
startReceive();
}
template<class T>
+ssize_t
+StreamTransport<T>::getSendQueueLength()
+{
+ ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
+ if (queueLength == QUEUE_ERROR) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+ }
+ return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
+}
+
+template<class T>
void
StreamTransport<T>::doClose()
{
@@ -165,6 +189,7 @@
bool wasQueueEmpty = m_sendQueue.empty();
m_sendQueue.push(packet.packet);
+ m_sendQueueBytes += packet.packet.size();
if (wasQueueEmpty)
sendFromQueue();
@@ -191,6 +216,8 @@
NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
BOOST_ASSERT(!m_sendQueue.empty());
+ BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
+ m_sendQueueBytes -= nBytesSent;
m_sendQueue.pop();
if (!m_sendQueue.empty())
@@ -299,6 +326,14 @@
{
std::queue<Block> emptyQueue;
std::swap(emptyQueue, m_sendQueue);
+ m_sendQueueBytes = 0;
+}
+
+template<class T>
+size_t
+StreamTransport<T>::getSendQueueBytes() const
+{
+ return m_sendQueueBytes;
}
} // namespace face
diff --git a/daemon/face/tcp-transport.cpp b/daemon/face/tcp-transport.cpp
index c14cab2..c95f693 100644
--- a/daemon/face/tcp-transport.cpp
+++ b/daemon/face/tcp-transport.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -25,6 +25,11 @@
#include "tcp-transport.hpp"
+#if defined(__linux__)
+#include <linux/sockios.h>
+#include <sys/ioctl.h>
+#endif
+
namespace nfd {
namespace face {
@@ -55,6 +60,28 @@
NFD_LOG_FACE_INFO("Creating transport");
}
+ssize_t
+TcpTransport::getSendQueueLength()
+{
+ int queueLength = getSendQueueBytes();
+
+ // We want to obtain the amount of "not sent" bytes instead of the amount of "not sent" + "not
+ // acked" bytes. On Linux, we use SIOCOUTQNSD for this reason. However, macOS does not provide an
+ // efficient mechanism to obtain this value (SO_NWRITE includes both "not sent" and "not acked").
+#if defined(__linux__)
+ int nsd;
+ if (ioctl(m_socket.native_handle(), SIOCOUTQNSD, &nsd) < 0) {
+ NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
+ }
+ else if (nsd > 0) {
+ NFD_LOG_FACE_TRACE("SIOCOUTQNSD=" << nsd);
+ queueLength += nsd;
+ }
+#endif
+
+ return queueLength;
+}
+
bool
TcpTransport::canChangePersistencyToImpl(ndn::nfd::FacePersistency newPersistency) const
{
diff --git a/daemon/face/tcp-transport.hpp b/daemon/face/tcp-transport.hpp
index ffd05a8..d6b0890 100644
--- a/daemon/face/tcp-transport.hpp
+++ b/daemon/face/tcp-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -44,6 +44,9 @@
public:
TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency);
+ ssize_t
+ getSendQueueLength() final;
+
protected:
bool
canChangePersistencyToImpl(ndn::nfd::FacePersistency newPersistency) const final;
diff --git a/daemon/face/transport.cpp b/daemon/face/transport.cpp
index 9c054fa..863b2b3 100644
--- a/daemon/face/transport.cpp
+++ b/daemon/face/transport.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -63,6 +63,7 @@
, m_persistency(ndn::nfd::FACE_PERSISTENCY_NONE)
, m_linkType(ndn::nfd::LINK_TYPE_NONE)
, m_mtu(MTU_INVALID)
+ , m_sendQueueCapacity(QUEUE_UNSUPPORTED)
, m_state(TransportState::UP)
, m_expirationTime(time::steady_clock::TimePoint::max())
{
@@ -125,6 +126,12 @@
m_service->receivePacket(std::move(packet));
}
+ssize_t
+Transport::getSendQueueLength()
+{
+ return QUEUE_UNSUPPORTED;
+}
+
bool
Transport::canChangePersistencyTo(ndn::nfd::FacePersistency newPersistency) const
{
diff --git a/daemon/face/transport.hpp b/daemon/face/transport.hpp
index 85a0fd7..a9768e5 100644
--- a/daemon/face/transport.hpp
+++ b/daemon/face/transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -28,6 +28,7 @@
#include "core/counter.hpp"
#include "face-log.hpp"
+
#include <ndn-cxx/encoding/nfd-constants.hpp>
namespace nfd {
@@ -98,6 +99,14 @@
*/
const ssize_t MTU_INVALID = -2;
+/** \brief indicates that the transport does not support reading the queue capacity/length
+ */
+const ssize_t QUEUE_UNSUPPORTED = -1;
+
+/** \brief indicates that the transport was unable to retrieve the queue capacity/length
+ */
+const ssize_t QUEUE_ERROR = -2;
+
/** \brief the lower part of a Face
* \sa Face
*/
@@ -254,6 +263,13 @@
ssize_t
getMtu() const;
+ /** \return capacity of the send queue (in bytes)
+ * \retval QUEUE_UNSUPPORTED transport does not support queue capacity retrieval
+ * \retval QUEUE_ERROR transport was unable to retrieve the queue capacity
+ */
+ ssize_t
+ getSendQueueCapacity() const;
+
public: // dynamic properties
/** \return transport state
*/
@@ -270,6 +286,13 @@
time::steady_clock::TimePoint
getExpirationTime() const;
+ /** \return current send queue length of the transport (in octets)
+ * \retval QUEUE_UNSUPPORTED transport does not support queue length retrieval
+ * \retval QUEUE_ERROR transport was unable to retrieve the queue length
+ */
+ virtual ssize_t
+ getSendQueueLength();
+
protected: // properties to be set by subclass
void
setLocalUri(const FaceUri& uri);
@@ -286,6 +309,9 @@
void
setMtu(ssize_t mtu);
+ void
+ setSendQueueCapacity(ssize_t sendQueueCapacity);
+
/** \brief set transport state
*
* Only the following transitions are valid:
@@ -346,6 +372,7 @@
ndn::nfd::FacePersistency m_persistency;
ndn::nfd::LinkType m_linkType;
ssize_t m_mtu;
+ size_t m_sendQueueCapacity;
TransportState m_state;
time::steady_clock::TimePoint m_expirationTime;
};
@@ -441,6 +468,18 @@
m_mtu = mtu;
}
+inline ssize_t
+Transport::getSendQueueCapacity() const
+{
+ return m_sendQueueCapacity;
+}
+
+inline void
+Transport::setSendQueueCapacity(ssize_t sendQueueCapacity)
+{
+ m_sendQueueCapacity = sendQueueCapacity;
+}
+
inline TransportState
Transport::getState() const
{
diff --git a/tests/daemon/face/datagram-transport.t.cpp b/tests/daemon/face/datagram-transport.t.cpp
index 6121152..8cfb622 100644
--- a/tests/daemon/face/datagram-transport.t.cpp
+++ b/tests/daemon/face/datagram-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -150,6 +150,13 @@
BOOST_REQUIRE_EQUAL(this->limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
}
+BOOST_FIXTURE_TEST_CASE_TEMPLATE(SendQueueLength, T, DatagramTransportFixtures, T)
+{
+ TRANSPORT_TEST_INIT();
+
+ BOOST_CHECK_EQUAL(this->transport->getSendQueueLength(), 0);
+}
+
BOOST_AUTO_TEST_SUITE_END() // TestDatagramTransport
BOOST_AUTO_TEST_SUITE_END() // Face
diff --git a/tests/daemon/face/dummy-transport.hpp b/tests/daemon/face/dummy-transport.hpp
index 07444aa..08a7141 100644
--- a/tests/daemon/face/dummy-transport.hpp
+++ b/tests/daemon/face/dummy-transport.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -44,8 +44,10 @@
ndn::nfd::FaceScope scope = ndn::nfd::FACE_SCOPE_NON_LOCAL,
ndn::nfd::FacePersistency persistency = ndn::nfd::FACE_PERSISTENCY_PERSISTENT,
ndn::nfd::LinkType linkType = ndn::nfd::LINK_TYPE_POINT_TO_POINT,
- ssize_t mtu = MTU_UNLIMITED)
+ ssize_t mtu = MTU_UNLIMITED,
+ ssize_t sendQueueCapacity = QUEUE_UNSUPPORTED)
: isClosed(false)
+ , m_sendQueueLength(0)
{
this->setLocalUri(FaceUri(localUri));
this->setRemoteUri(FaceUri(remoteUri));
@@ -53,6 +55,7 @@
this->setPersistency(persistency);
this->setLinkType(linkType);
this->setMtu(mtu);
+ this->setSendQueueCapacity(sendQueueCapacity);
}
void
@@ -67,6 +70,18 @@
this->Transport::setState(state);
}
+ ssize_t
+ getSendQueueLength() override
+ {
+ return m_sendQueueLength;
+ }
+
+ void
+ setSendQueueLength(ssize_t sendQueueLength)
+ {
+ m_sendQueueLength = sendQueueLength;
+ }
+
void
receivePacket(Packet&& packet)
{
@@ -87,14 +102,14 @@
}
private:
- virtual void
+ void
doClose() override
{
isClosed = true;
this->setState(TransportState::CLOSED);
}
- virtual void
+ void
doSend(Packet&& packet) override
{
sentPackets.push_back(std::move(packet));
@@ -103,6 +118,9 @@
public:
bool isClosed;
std::vector<Packet> sentPackets;
+
+private:
+ ssize_t m_sendQueueLength;
};
} // namespace tests
diff --git a/tests/daemon/face/generic-link-service.t.cpp b/tests/daemon/face/generic-link-service.t.cpp
index e4e82f7..98801c6 100644
--- a/tests/daemon/face/generic-link-service.t.cpp
+++ b/tests/daemon/face/generic-link-service.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -25,10 +25,11 @@
#include "face/generic-link-service.hpp"
#include "face/face.hpp"
-#include "dummy-transport.hpp"
-#include <ndn-cxx/lp/tags.hpp>
-#include "tests/test-common.hpp"
+#include "test-common.hpp"
+#include "dummy-transport.hpp"
+
+#include <ndn-cxx/lp/tags.hpp>
namespace nfd {
namespace face {
@@ -40,7 +41,7 @@
using nfd::Face;
-class GenericLinkServiceFixture : public BaseFixture
+class GenericLinkServiceFixture : public UnitTestTimeFixture
{
protected:
GenericLinkServiceFixture()
@@ -53,7 +54,9 @@
}
void
- initialize(const GenericLinkService::Options& options, ssize_t mtu = MTU_UNLIMITED)
+ initialize(const GenericLinkService::Options& options,
+ ssize_t mtu = MTU_UNLIMITED,
+ ssize_t sendQueueCapacity = QUEUE_UNSUPPORTED)
{
face.reset(new Face(make_unique<GenericLinkService>(options),
make_unique<DummyTransport>("dummy://",
@@ -61,7 +64,8 @@
ndn::nfd::FACE_SCOPE_NON_LOCAL,
ndn::nfd::FACE_PERSISTENCY_PERSISTENT,
ndn::nfd::LINK_TYPE_POINT_TO_POINT,
- mtu)));
+ mtu,
+ sendQueueCapacity)));
service = static_cast<GenericLinkService*>(face->getLinkService());
transport = static_cast<DummyTransport*>(face->getTransport());
@@ -462,6 +466,357 @@
BOOST_AUTO_TEST_SUITE_END() // Reliability
+// congestion detection and marking
+BOOST_AUTO_TEST_SUITE(CongestionMark)
+
+BOOST_AUTO_TEST_CASE(NoCongestion)
+{
+ GenericLinkService::Options options;
+ options.allowCongestionMarking = true;
+ options.baseCongestionMarkingInterval = time::milliseconds(100);
+ initialize(options, MTU_UNLIMITED, 65536);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ shared_ptr<Interest> interest = makeInterest("/12345678");
+
+ // congestion threshold will be 32768 bytes, since min(65536, 65536 / 2) = 32768 bytes
+
+ // no congestion
+ transport->setSendQueueLength(0);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 1);
+ lp::Packet pkt1;
+ BOOST_REQUIRE_NO_THROW(pkt1.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt1.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ // no congestion
+ transport->setSendQueueLength(32768);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 2);
+ lp::Packet pkt2;
+ BOOST_REQUIRE_NO_THROW(pkt2.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt2.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+}
+
+BOOST_AUTO_TEST_CASE(CongestionCoDel)
+{
+ GenericLinkService::Options options;
+ options.allowCongestionMarking = true;
+ options.baseCongestionMarkingInterval = time::milliseconds(100);
+ initialize(options, MTU_UNLIMITED, 65536);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, time::steady_clock::TimePoint::min());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ shared_ptr<Interest> interest = makeInterest("/12345678");
+
+ // congestion threshold will be 32768 bytes, since min(65536, 65536 / 2) = 32768 bytes
+
+ // first congested packet, will be marked
+ transport->setSendQueueLength(32769);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 1);
+ lp::Packet pkt1;
+ BOOST_REQUIRE_NO_THROW(pkt1.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt1.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt1.get<lp::CongestionMarkField>(), 1);
+ time::steady_clock::TimePoint nextMarkTime = time::steady_clock::now() + time::milliseconds(100);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ time::steady_clock::TimePoint lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 1);
+
+ // advance clock to half of marking interval cycle
+ advanceClocks(time::milliseconds(50));
+
+ // second congested packet, but within marking interval, will not be marked
+ transport->setSendQueueLength(33000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 2);
+ lp::Packet pkt2;
+ BOOST_REQUIRE_NO_THROW(pkt2.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt2.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 1);
+
+ // advance clocks past end of initial interval cycle
+ this->advanceClocks(time::milliseconds(51));
+
+ // first congested packet after waiting marking interval, will be marked
+ transport->setSendQueueLength(40000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 3);
+ lp::Packet pkt3;
+ BOOST_REQUIRE_NO_THROW(pkt3.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt3.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt3.get<lp::CongestionMarkField>(), 1);
+ time::nanoseconds markingInterval(
+ static_cast<time::nanoseconds::rep>(options.baseCongestionMarkingInterval.count() /
+ std::sqrt(service->m_nMarkedSinceInMarkingState)));
+ nextMarkTime += markingInterval;
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 2);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 2);
+
+ // advance clock partway through current marking interval
+ this->advanceClocks(markingInterval - time::milliseconds(10));
+
+ // still congested, but within marking interval cycle
+ transport->setSendQueueLength(38000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 4);
+ lp::Packet pkt4;
+ BOOST_REQUIRE_NO_THROW(pkt4.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt4.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 2);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 2);
+
+ // advance clocks past end of current marking interval cycle
+ this->advanceClocks(time::milliseconds(11));
+
+ // still congested, after marking interval cycle
+ transport->setSendQueueLength(39000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 5);
+ lp::Packet pkt5;
+ BOOST_REQUIRE_NO_THROW(pkt5.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt5.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt5.get<lp::CongestionMarkField>(), 1);
+ markingInterval = time::nanoseconds(
+ static_cast<time::nanoseconds::rep>(options.baseCongestionMarkingInterval.count() /
+ std::sqrt(service->m_nMarkedSinceInMarkingState)));
+ nextMarkTime += markingInterval;
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 3);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 3);
+
+ this->advanceClocks(time::milliseconds(1));
+
+ // still congested, but within marking interval cycle
+ transport->setSendQueueLength(38000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 6);
+ lp::Packet pkt6;
+ BOOST_REQUIRE_NO_THROW(pkt6.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt6.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 3);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 3);
+
+ this->advanceClocks(markingInterval);
+
+ // still congested, after marking interval cycle
+ transport->setSendQueueLength(34000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 7);
+ lp::Packet pkt7;
+ BOOST_REQUIRE_NO_THROW(pkt7.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt7.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt7.get<lp::CongestionMarkField>(), 1);
+ markingInterval = time::nanoseconds(
+ static_cast<time::nanoseconds::rep>(options.baseCongestionMarkingInterval.count() /
+ std::sqrt(service->m_nMarkedSinceInMarkingState)));
+ nextMarkTime += markingInterval;
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 4);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 4);
+
+ // no more congestion
+ transport->setSendQueueLength(30000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 8);
+ lp::Packet pkt8;
+ BOOST_REQUIRE_NO_THROW(pkt8.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt8.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 4);
+
+ this->advanceClocks(time::milliseconds(50));
+
+ // send queue congested again, but can't mark packet because within one full interval of last mark
+ transport->setSendQueueLength(50000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 9);
+ lp::Packet pkt9;
+ BOOST_REQUIRE_NO_THROW(pkt9.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt9.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 4);
+
+ // advance clock past full 100ms interval since last mark
+ this->advanceClocks(time::milliseconds(51));
+
+ transport->setSendQueueLength(40000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 10);
+ lp::Packet pkt10;
+ BOOST_REQUIRE_NO_THROW(pkt10.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt10.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt10.get<lp::CongestionMarkField>(), 1);
+ nextMarkTime = time::steady_clock::now() + time::milliseconds(100);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 5);
+
+ // advance clock partway through 100ms marking interval
+ this->advanceClocks(time::milliseconds(50));
+
+ // not marked since within 100ms window before can mark again
+ transport->setSendQueueLength(50000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 11);
+ lp::Packet pkt11;
+ BOOST_REQUIRE_NO_THROW(pkt11.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt11.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 5);
+
+ // advance clocks past m_nextMarkTime
+ this->advanceClocks(time::milliseconds(51));
+
+ // markable packet, queue length still above threshold
+ transport->setSendQueueLength(33000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 12);
+ lp::Packet pkt12;
+ BOOST_REQUIRE_NO_THROW(pkt12.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt12.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt12.get<lp::CongestionMarkField>(), 1);
+ markingInterval = time::nanoseconds(
+ static_cast<time::nanoseconds::rep>(options.baseCongestionMarkingInterval.count() /
+ std::sqrt(service->m_nMarkedSinceInMarkingState)));
+ nextMarkTime += markingInterval;
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 2);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 6);
+
+ // no more congestion
+ transport->setSendQueueLength(10000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 13);
+ lp::Packet pkt13;
+ BOOST_REQUIRE_NO_THROW(pkt13.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt13.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 6);
+
+ // advance clocks past one full interval since last mark
+ this->advanceClocks(time::milliseconds(101));
+
+ // start congestion again
+ transport->setSendQueueLength(50000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 14);
+ lp::Packet pkt14;
+ BOOST_REQUIRE_NO_THROW(pkt14.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt14.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt14.get<lp::CongestionMarkField>(), 1);
+ nextMarkTime = time::steady_clock::now() + time::milliseconds(100);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ lastMarkTime = time::steady_clock::now();
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 7);
+
+ // no more congestion, cancel marking interval
+ transport->setSendQueueLength(5000);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 15);
+ lp::Packet pkt15;
+ BOOST_REQUIRE_NO_THROW(pkt15.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt15.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_lastMarkTime, lastMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 7);
+}
+
+BOOST_AUTO_TEST_CASE(DefaultThreshold)
+{
+ GenericLinkService::Options options;
+ options.allowCongestionMarking = true;
+ options.baseCongestionMarkingInterval = time::milliseconds(100);
+ initialize(options, MTU_UNLIMITED, QUEUE_UNSUPPORTED);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ shared_ptr<Interest> interest = makeInterest("/12345678");
+
+ // congestion threshold will be 65536 bytes, since the transport reports that it cannot measure
+ // the queue capacity
+
+ // no congestion
+ transport->setSendQueueLength(0);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 1);
+ lp::Packet pkt1;
+ BOOST_REQUIRE_NO_THROW(pkt1.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt1.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ // no congestion
+ transport->setSendQueueLength(65536);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 2);
+ lp::Packet pkt2;
+ BOOST_REQUIRE_NO_THROW(pkt2.wireDecode(transport->sentPackets.back().packet));
+ BOOST_CHECK_EQUAL(pkt2.count<lp::CongestionMarkField>(), 0);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, time::steady_clock::TimePoint::max());
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 0);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 0);
+
+ // first congested (and marked) packet
+ transport->setSendQueueLength(65537);
+ face->sendInterest(*interest);
+ BOOST_REQUIRE_EQUAL(transport->sentPackets.size(), 3);
+ lp::Packet pkt3;
+ BOOST_REQUIRE_NO_THROW(pkt3.wireDecode(transport->sentPackets.back().packet));
+ BOOST_REQUIRE_EQUAL(pkt3.count<lp::CongestionMarkField>(), 1);
+ BOOST_CHECK_EQUAL(pkt3.get<lp::CongestionMarkField>(), 1);
+ time::steady_clock::TimePoint nextMarkTime = time::steady_clock::now() + time::milliseconds(100);
+ BOOST_CHECK_EQUAL(service->m_nextMarkTime, nextMarkTime);
+ BOOST_CHECK_EQUAL(service->m_nMarkedSinceInMarkingState, 1);
+ BOOST_CHECK_EQUAL(service->getCounters().nCongestionMarked, 1);
+}
+
+BOOST_AUTO_TEST_SUITE_END() // CongestionMark
+
BOOST_AUTO_TEST_SUITE(LpFields)
BOOST_AUTO_TEST_CASE(ReceiveNextHopFaceId)
@@ -772,7 +1127,6 @@
BOOST_AUTO_TEST_SUITE_END() // LpFields
-
BOOST_AUTO_TEST_SUITE(Malformed) // receive malformed packets
BOOST_AUTO_TEST_CASE(WrongTlvType)
diff --git a/tests/daemon/face/multicast-ethernet-transport.t.cpp b/tests/daemon/face/multicast-ethernet-transport.t.cpp
index 7544adc..511f57a 100644
--- a/tests/daemon/face/multicast-ethernet-transport.t.cpp
+++ b/tests/daemon/face/multicast-ethernet-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -46,6 +46,7 @@
BOOST_CHECK_EQUAL(transport->getScope(), ndn::nfd::FACE_SCOPE_NON_LOCAL);
BOOST_CHECK_EQUAL(transport->getPersistency(), ndn::nfd::FACE_PERSISTENCY_PERMANENT);
BOOST_CHECK_EQUAL(transport->getLinkType(), ndn::nfd::LINK_TYPE_MULTI_ACCESS);
+ BOOST_CHECK_EQUAL(transport->getSendQueueCapacity(), QUEUE_UNSUPPORTED);
}
BOOST_AUTO_TEST_CASE(PersistencyChange)
@@ -79,6 +80,14 @@
BOOST_REQUIRE_EQUAL(limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
}
+BOOST_AUTO_TEST_CASE(SendQueueLength)
+{
+ SKIP_IF_ETHERNET_NETIF_COUNT_LT(1);
+ initializeMulticast();
+
+ BOOST_CHECK_EQUAL(transport->getSendQueueLength(), QUEUE_UNSUPPORTED);
+}
+
BOOST_AUTO_TEST_SUITE_END() // TestMulticastEthernetTransport
BOOST_AUTO_TEST_SUITE_END() // Face
diff --git a/tests/daemon/face/multicast-udp-transport.t.cpp b/tests/daemon/face/multicast-udp-transport.t.cpp
index 51acb08..de6d30f 100644
--- a/tests/daemon/face/multicast-udp-transport.t.cpp
+++ b/tests/daemon/face/multicast-udp-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -62,6 +62,7 @@
BOOST_CHECK_EQUAL(this->transport->getLinkType(), ndn::nfd::LINK_TYPE_MULTI_ACCESS);
BOOST_CHECK_EQUAL(this->transport->getMtu(),
this->addressFamily == AddressFamily::V4 ? (65535 - 60 - 8) : (65535 - 8));
+ BOOST_CHECK_GT(this->transport->getSendQueueCapacity(), 0);
}
BOOST_AUTO_TEST_CASE(PersistencyChange)
diff --git a/tests/daemon/face/stream-transport.t.cpp b/tests/daemon/face/stream-transport.t.cpp
index 33baec1..6628b92 100644
--- a/tests/daemon/face/stream-transport.t.cpp
+++ b/tests/daemon/face/stream-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -214,6 +214,13 @@
BOOST_REQUIRE_EQUAL(this->limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
}
+BOOST_FIXTURE_TEST_CASE_TEMPLATE(SendQueueLength, T, StreamTransportFixtures, T)
+{
+ TRANSPORT_TEST_INIT();
+
+ BOOST_CHECK_EQUAL(this->transport->getSendQueueLength(), 0);
+}
+
BOOST_AUTO_TEST_SUITE_END() // TestStreamTransport
BOOST_AUTO_TEST_SUITE_END() // Face
diff --git a/tests/daemon/face/tcp-transport.t.cpp b/tests/daemon/face/tcp-transport.t.cpp
index 2f4171b..ef23ca6 100644
--- a/tests/daemon/face/tcp-transport.t.cpp
+++ b/tests/daemon/face/tcp-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -54,6 +54,7 @@
BOOST_CHECK_EQUAL(this->transport->getPersistency(), ndn::nfd::FACE_PERSISTENCY_PERSISTENT);
BOOST_CHECK_EQUAL(this->transport->getLinkType(), ndn::nfd::LINK_TYPE_POINT_TO_POINT);
BOOST_CHECK_EQUAL(this->transport->getMtu(), MTU_UNLIMITED);
+ BOOST_CHECK_EQUAL(this->transport->getSendQueueCapacity(), QUEUE_UNSUPPORTED);
}
BOOST_AUTO_TEST_CASE(PersistencyChange)
diff --git a/tests/daemon/face/unicast-ethernet-transport.t.cpp b/tests/daemon/face/unicast-ethernet-transport.t.cpp
index 49a0fbb..a95c3fb 100644
--- a/tests/daemon/face/unicast-ethernet-transport.t.cpp
+++ b/tests/daemon/face/unicast-ethernet-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -120,6 +120,14 @@
BOOST_CHECK_EQUAL(nStateChanges, 2);
}
+BOOST_AUTO_TEST_CASE(SendQueueLength)
+{
+ SKIP_IF_ETHERNET_NETIF_COUNT_LT(1);
+ initializeUnicast();
+
+ BOOST_CHECK_EQUAL(transport->getSendQueueLength(), QUEUE_UNSUPPORTED);
+}
+
BOOST_AUTO_TEST_SUITE_END() // TestUnicastEthernetTransport
BOOST_AUTO_TEST_SUITE_END() // Face
diff --git a/tests/daemon/face/unicast-udp-transport.t.cpp b/tests/daemon/face/unicast-udp-transport.t.cpp
index 075194b..a2a3e79 100644
--- a/tests/daemon/face/unicast-udp-transport.t.cpp
+++ b/tests/daemon/face/unicast-udp-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -54,6 +54,7 @@
BOOST_CHECK_EQUAL(this->transport->getLinkType(), ndn::nfd::LINK_TYPE_POINT_TO_POINT);
BOOST_CHECK_EQUAL(this->transport->getMtu(),
this->addressFamily == AddressFamily::V4 ? (65535 - 60 - 8) : (65535 - 8));
+ BOOST_CHECK_GT(this->transport->getSendQueueCapacity(), 0);
}
BOOST_AUTO_TEST_CASE(PersistencyChange)
diff --git a/tests/daemon/face/unix-stream-transport.t.cpp b/tests/daemon/face/unix-stream-transport.t.cpp
index de2e03a..8eebb60 100644
--- a/tests/daemon/face/unix-stream-transport.t.cpp
+++ b/tests/daemon/face/unix-stream-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2017, Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -48,6 +48,7 @@
BOOST_CHECK_EQUAL(transport->getPersistency(), ndn::nfd::FACE_PERSISTENCY_ON_DEMAND);
BOOST_CHECK_EQUAL(transport->getLinkType(), ndn::nfd::LINK_TYPE_POINT_TO_POINT);
BOOST_CHECK_EQUAL(transport->getMtu(), MTU_UNLIMITED);
+ BOOST_CHECK_EQUAL(transport->getSendQueueCapacity(), QUEUE_UNSUPPORTED);
}
BOOST_AUTO_TEST_CASE(PersistencyChange)
diff --git a/tests/daemon/face/websocket-transport.t.cpp b/tests/daemon/face/websocket-transport.t.cpp
index f13e065..66a3d43 100644
--- a/tests/daemon/face/websocket-transport.t.cpp
+++ b/tests/daemon/face/websocket-transport.t.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2017, Regents of the University of California,
+ * Copyright (c) 2014-2018, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -54,6 +54,7 @@
BOOST_CHECK_EQUAL(this->transport->getPersistency(), ndn::nfd::FACE_PERSISTENCY_ON_DEMAND);
BOOST_CHECK_EQUAL(this->transport->getLinkType(), ndn::nfd::LINK_TYPE_POINT_TO_POINT);
BOOST_CHECK_EQUAL(this->transport->getMtu(), MTU_UNLIMITED);
+ BOOST_CHECK_EQUAL(this->transport->getSendQueueCapacity(), QUEUE_UNSUPPORTED);
}
using StaticPropertiesV4MappedFixtures = boost::mpl::vector<
@@ -246,6 +247,13 @@
BOOST_CHECK_EQUAL(nStateChanges, 2);
}
+BOOST_FIXTURE_TEST_CASE_TEMPLATE(SendQueueLength, T, WebSocketTransportFixtures, T)
+{
+ TRANSPORT_TEST_INIT();
+
+ BOOST_CHECK_EQUAL(this->transport->getSendQueueLength(), QUEUE_UNSUPPORTED);
+}
+
BOOST_AUTO_TEST_SUITE_END() // TestWebSocketTransport
BOOST_AUTO_TEST_SUITE_END() // Face