face: deduplicate received LpPackets with reliability
refs #5079
Change-Id: I19ad959ba736253a750997fa468d419c93425686
diff --git a/daemon/face/generic-link-service.cpp b/daemon/face/generic-link-service.cpp
index 1fecdb6..7e32fc7 100644
--- a/daemon/face/generic-link-service.cpp
+++ b/daemon/face/generic-link-service.cpp
@@ -125,6 +125,14 @@
}
void
+GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
+{
+ std::for_each(pkts.begin(), pkts.end(), [this] (lp::Packet& pkt) {
+ pkt.set<lp::SequenceField>(++m_lastSeqNo);
+ });
+}
+
+void
GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
{
if (m_options.allowLocalFields) {
@@ -200,8 +208,8 @@
BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
}
- // Only assign sequences to fragments if packet contains more than 1 fragment
- if (frags.size() > 1) {
+ // Only assign sequences to fragments if reliability enabled or if packet contains >1 fragment
+ if (m_options.reliabilityOptions.isEnabled || frags.size() > 1) {
// Assign sequences to all fragments
this->assignSequences(frags);
}
@@ -216,18 +224,6 @@
}
void
-GenericLinkService::assignSequence(lp::Packet& pkt)
-{
- pkt.set<lp::SequenceField>(++m_lastSeqNo);
-}
-
-void
-GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
-{
- std::for_each(pkts.begin(), pkts.end(), [this] (auto& pkt) { this->assignSequence(pkt); });
-}
-
-void
GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
{
ssize_t sendQueueLength = getTransport()->getSendQueueLength();
@@ -279,7 +275,11 @@
lp::Packet pkt(packet);
if (m_options.reliabilityOptions.isEnabled) {
- m_reliability.processIncomingPacket(pkt);
+ if (!m_reliability.processIncomingPacket(pkt)) {
+ NFD_LOG_FACE_TRACE("received duplicate fragment: DROP");
+ ++this->nDuplicateSequence;
+ return;
+ }
}
if (!pkt.has<lp::FragmentField>()) {
diff --git a/daemon/face/generic-link-service.hpp b/daemon/face/generic-link-service.hpp
index 3137555..e30a3e6 100644
--- a/daemon/face/generic-link-service.hpp
+++ b/daemon/face/generic-link-service.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, Regents of the University of California,
+ * Copyright (c) 2014-2020, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -81,6 +81,10 @@
*/
PacketCounter nRetxExhausted;
+ /** \brief count of LpPackets dropped due to duplicate Sequence numbers
+ */
+ PacketCounter nDuplicateSequence;
+
/** \brief count of outgoing LpPackets that were marked with congestion marks
*/
PacketCounter nCongestionMarked;
@@ -199,6 +203,11 @@
void
doSendNack(const ndn::lp::Nack& nack, const EndpointId& endpointId) OVERRIDE_WITH_TESTS_ELSE_FINAL;
+ /** \brief assign consecutive sequence numbers to LpPackets
+ */
+ void
+ assignSequences(std::vector<lp::Packet>& pkts);
+
private: // send path
/** \brief encode link protocol fields from tags onto an outgoing LpPacket
* \param netPkt network-layer packet to extract tags from
@@ -215,16 +224,6 @@
void
sendNetPacket(lp::Packet&& pkt, const EndpointId& endpointId, bool isInterest);
- /** \brief assign a sequence number to an LpPacket
- */
- void
- assignSequence(lp::Packet& pkt);
-
- /** \brief assign consecutive sequence numbers to LpPackets
- */
- void
- assignSequences(std::vector<lp::Packet>& pkts);
-
/** \brief if the send queue is found to be congested, add a congestion mark to the packet
* according to CoDel
* \sa https://tools.ietf.org/html/rfc8289
diff --git a/daemon/face/lp-reliability.cpp b/daemon/face/lp-reliability.cpp
index e9a4914..21189be 100644
--- a/daemon/face/lp-reliability.cpp
+++ b/daemon/face/lp-reliability.cpp
@@ -73,6 +73,9 @@
netPkt->unackedFrags.reserve(frags.size());
for (lp::Packet& frag : frags) {
+ // Non-IDLE packets are required to have assigned Sequence numbers with LpReliability enabled
+ BOOST_ASSERT(frag.has<lp::SequenceField>());
+
// Assign TxSequence number
lp::Sequence txSeq = assignTxSequence(frag);
@@ -83,11 +86,11 @@
std::forward_as_tuple(frag));
unackedFragsIt->second.sendTime = sendTime;
auto rto = m_rttEst.getEstimatedRto();
- NFD_LOG_FACE_TRACE("transmitting txseq=" << txSeq << ", rto=" <<
+ lp::Sequence seq = frag.get<lp::SequenceField>();
+ NFD_LOG_FACE_TRACE("transmitting seq=" << seq << ", txseq=" << txSeq << ", rto=" <<
time::duration_cast<time::milliseconds>(rto).count() << "ms");
unackedFragsIt->second.rtoTimer = getScheduler().schedule(rto, [=] {
- NFD_LOG_FACE_TRACE("rto timer expired for txseq=" << txSeq);
- onLpPacketLost(txSeq);
+ onLpPacketLost(txSeq, true);
});
unackedFragsIt->second.netPkt = netPkt;
@@ -100,19 +103,20 @@
}
}
-void
+bool
LpReliability::processIncomingPacket(const lp::Packet& pkt)
{
BOOST_ASSERT(m_options.isEnabled);
+ bool isDuplicate = false;
auto now = time::steady_clock::now();
// Extract and parse Acks
- for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
- auto fragIt = m_unackedFrags.find(ackSeq);
+ for (lp::Sequence ackTxSeq : pkt.list<lp::AckField>()) {
+ auto fragIt = m_unackedFrags.find(ackTxSeq);
if (fragIt == m_unackedFrags.end()) {
// Ignore an Ack for an unknown TxSequence number
- NFD_LOG_FACE_DEBUG("received ack for unknown txseq=" << ackSeq);
+ NFD_LOG_FACE_DEBUG("received ack for unknown txseq=" << ackTxSeq);
continue;
}
auto& frag = fragIt->second;
@@ -121,17 +125,19 @@
frag.rtoTimer.cancel();
if (frag.retxCount == 0) {
- NFD_LOG_FACE_TRACE("received ack for txseq=" << ackSeq << ", retx=0, rtt=" <<
+ NFD_LOG_FACE_TRACE("received ack for seq=" << frag.pkt.get<lp::SequenceField>() << ", txseq=" <<
+ ackTxSeq << ", retx=0, rtt=" <<
time::duration_cast<time::milliseconds>(now - frag.sendTime).count() << "ms");
// This sequence had no retransmissions, so use it to estimate the RTO
m_rttEst.addMeasurement(now - frag.sendTime);
}
else {
- NFD_LOG_FACE_TRACE("received ack for txseq=" << ackSeq << ", retx=" << frag.retxCount);
+ NFD_LOG_FACE_TRACE("received ack for seq=" << frag.pkt.get<lp::SequenceField>() << ", txseq=" <<
+ ackTxSeq << ", retx=" << frag.retxCount);
}
- // Look for frags with TxSequence numbers < ackSeq (allowing for wraparound) and consider them
- // lost if a configurable number of Acks containing greater TxSequence numbers have been
+ // Look for frags with TxSequence numbers < ackTxSeq (allowing for wraparound) and consider
+ // them lost if a configurable number of Acks containing greater TxSequence numbers have been
// received.
auto lostLpPackets = findLostLpPackets(fragIt);
@@ -148,9 +154,8 @@
// Resend or fail fragments considered lost. Potentially increment the start of the window.
for (lp::Sequence txSeq : lostLpPackets) {
if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
- NFD_LOG_FACE_TRACE("txseq=" << txSeq << " considered lost from acks for more recent txseqs");
- auto removedThisTxSeq = onLpPacketLost(txSeq);
- for (auto removedTxSeq : removedThisTxSeq) {
+ auto removedTxSeqs = onLpPacketLost(txSeq, false);
+ for (auto removedTxSeq : removedTxSeqs) {
removedLpPackets.insert(removedTxSeq);
}
}
@@ -161,8 +166,27 @@
if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
NFD_LOG_FACE_TRACE("queueing ack for remote txseq=" << pkt.get<lp::TxSequenceField>());
m_ackQueue.push(pkt.get<lp::TxSequenceField>());
+
+ // Check for received frames with duplicate Sequences
+ if (pkt.has<lp::SequenceField>()) {
+ lp::Sequence pktSequence = pkt.get<lp::SequenceField>();
+ isDuplicate = m_recentRecvSeqs.count(pktSequence) > 0;
+ // Check for recent received Sequences to remove
+ auto now = time::steady_clock::now();
+ auto rto = m_rttEst.getEstimatedRto();
+ while (m_recentRecvSeqsQueue.size() > 0 &&
+ now > m_recentRecvSeqs[m_recentRecvSeqsQueue.front()] + rto) {
+ m_recentRecvSeqs.erase(m_recentRecvSeqsQueue.front());
+ m_recentRecvSeqsQueue.pop();
+ }
+ m_recentRecvSeqs.emplace(pktSequence, now);
+ m_recentRecvSeqsQueue.push(pktSequence);
+ }
+
startIdleAckTimer();
}
+
+ return !isDuplicate;
}
void
@@ -179,7 +203,7 @@
remainingSpace -= pktSize;
while (!m_ackQueue.empty()) {
- lp::Sequence ackSeq = m_ackQueue.front();
+ lp::Sequence ackTxSeq = m_ackQueue.front();
// Ack size = Ack TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
tlv::sizeOfVarNumber(sizeof(lp::Sequence)) +
@@ -189,9 +213,9 @@
break;
}
- NFD_LOG_FACE_TRACE("piggybacking ack for remote txseq=" << ackSeq);
+ NFD_LOG_FACE_TRACE("piggybacking ack for remote txseq=" << ackTxSeq);
- pkt.add<lp::AckField>(ackSeq);
+ pkt.add<lp::AckField>(ackTxSeq);
m_ackQueue.pop();
remainingSpace -= ackSize;
}
@@ -239,8 +263,8 @@
auto& unackedFrag = it->second;
unackedFrag.nGreaterSeqAcks++;
- NFD_LOG_FACE_TRACE("received ack=" << ackIt->first << ", out-of-order for txseq=" <<
- it->first << ", out-of-order count=" << unackedFrag.nGreaterSeqAcks);
+ NFD_LOG_FACE_TRACE("received ack=" << ackIt->first << " before=" << it->first <<
+ ", before count=" << unackedFrag.nGreaterSeqAcks);
if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
lostLpPackets.push_back(it->first);
@@ -251,7 +275,7 @@
}
std::vector<lp::Sequence>
-LpReliability::onLpPacketLost(lp::Sequence txSeq)
+LpReliability::onLpPacketLost(lp::Sequence txSeq, bool isTimeout)
{
BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
auto txSeqIt = m_unackedFrags.find(txSeq);
@@ -260,10 +284,19 @@
txFrag.rtoTimer.cancel();
auto netPkt = txFrag.netPkt;
std::vector<lp::Sequence> removedThisTxSeq;
+ lp::Sequence seq = txFrag.pkt.get<lp::SequenceField>();
+
+ if (isTimeout) {
+ NFD_LOG_FACE_TRACE("rto timer expired for seq=" << seq << ", txseq=" << txSeq);
+ }
+ else { // lost due to out-of-order TxSeqs
+ NFD_LOG_FACE_TRACE("seq=" << seq << ", txseq=" << txSeq <<
+ " considered lost from acks for more recent txseqs");
+ }
// Check if maximum number of retransmissions exceeded
if (txFrag.retxCount >= m_options.maxRetx) {
- NFD_LOG_FACE_DEBUG("txseq=" << txSeq << " exceeded allowed retransmissions: DROP");
+ NFD_LOG_FACE_DEBUG("seq=" << seq << " exceeded allowed retransmissions: DROP");
// Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
if (netPkt->unackedFrags[i] != txSeqIt) {
@@ -316,14 +349,13 @@
m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt), 0);
auto rto = m_rttEst.getEstimatedRto();
- NFD_LOG_FACE_TRACE("retransmitting txseq=" << txSeq << " as " << newTxSeq << ", retx=" <<
+ NFD_LOG_FACE_TRACE("retransmitting seq=" << seq << ", txseq=" << newTxSeq << ", retx=" <<
txFrag.retxCount << ", rto=" <<
time::duration_cast<time::milliseconds>(rto).count() << "ms");
// Start RTO timer for this sequence
newTxFrag.rtoTimer = getScheduler().schedule(rto, [=] {
- NFD_LOG_FACE_TRACE("rto timer expired for txseq=" << newTxSeq);
- onLpPacketLost(newTxSeq);
+ onLpPacketLost(newTxSeq, true);
});
}
diff --git a/daemon/face/lp-reliability.hpp b/daemon/face/lp-reliability.hpp
index ad7c968..1ea2b9a 100644
--- a/daemon/face/lp-reliability.hpp
+++ b/daemon/face/lp-reliability.hpp
@@ -93,8 +93,9 @@
/** \brief extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue
* \param pkt incoming LpPacket
+ * \return whether incoming LpPacket is new and not a duplicate
*/
- void
+ bool
processIncomingPacket(const lp::Packet& pkt);
/** \brief called by GenericLinkService to attach Acks onto an outgoing LpPacket
@@ -139,7 +140,7 @@
* \return vector of the TxSequences of fragments removed due to a network packet being removed
*/
std::vector<lp::Sequence>
- onLpPacketLost(lp::Sequence txSeq);
+ onLpPacketLost(lp::Sequence txSeq, bool isTimeout);
/** \brief remove the fragment with the given sequence number from the map of unacknowledged
* fragments, as well as its associated network packet (if any)
@@ -210,6 +211,8 @@
*/
UnackedFrags::iterator m_firstUnackedFrag;
std::queue<lp::Sequence> m_ackQueue;
+ std::map<lp::Sequence, time::steady_clock::TimePoint> m_recentRecvSeqs;
+ std::queue<lp::Sequence> m_recentRecvSeqsQueue;
lp::Sequence m_lastTxSeqNo;
scheduler::ScopedEventId m_idleAckTimer;
ndn::util::RttEstimator m_rttEst;