face: deduplicate received LpPackets with reliability

refs #5079

Change-Id: I19ad959ba736253a750997fa468d419c93425686
diff --git a/daemon/face/lp-reliability.cpp b/daemon/face/lp-reliability.cpp
index e9a4914..21189be 100644
--- a/daemon/face/lp-reliability.cpp
+++ b/daemon/face/lp-reliability.cpp
@@ -73,6 +73,9 @@
   netPkt->unackedFrags.reserve(frags.size());
 
   for (lp::Packet& frag : frags) {
+    // Non-IDLE packets are required to have assigned Sequence numbers with LpReliability enabled
+    BOOST_ASSERT(frag.has<lp::SequenceField>());
+
     // Assign TxSequence number
     lp::Sequence txSeq = assignTxSequence(frag);
 
@@ -83,11 +86,11 @@
                                                  std::forward_as_tuple(frag));
     unackedFragsIt->second.sendTime = sendTime;
     auto rto = m_rttEst.getEstimatedRto();
-    NFD_LOG_FACE_TRACE("transmitting txseq=" << txSeq << ", rto=" <<
+    lp::Sequence seq = frag.get<lp::SequenceField>();
+    NFD_LOG_FACE_TRACE("transmitting seq=" << seq << ", txseq=" << txSeq << ", rto=" <<
                        time::duration_cast<time::milliseconds>(rto).count() << "ms");
     unackedFragsIt->second.rtoTimer = getScheduler().schedule(rto, [=] {
-      NFD_LOG_FACE_TRACE("rto timer expired for txseq=" << txSeq);
-      onLpPacketLost(txSeq);
+      onLpPacketLost(txSeq, true);
     });
     unackedFragsIt->second.netPkt = netPkt;
 
@@ -100,19 +103,20 @@
   }
 }
 
-void
+bool
 LpReliability::processIncomingPacket(const lp::Packet& pkt)
 {
   BOOST_ASSERT(m_options.isEnabled);
 
+  bool isDuplicate = false;
   auto now = time::steady_clock::now();
 
   // Extract and parse Acks
-  for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
-    auto fragIt = m_unackedFrags.find(ackSeq);
+  for (lp::Sequence ackTxSeq : pkt.list<lp::AckField>()) {
+    auto fragIt = m_unackedFrags.find(ackTxSeq);
     if (fragIt == m_unackedFrags.end()) {
       // Ignore an Ack for an unknown TxSequence number
-      NFD_LOG_FACE_DEBUG("received ack for unknown txseq=" << ackSeq);
+      NFD_LOG_FACE_DEBUG("received ack for unknown txseq=" << ackTxSeq);
       continue;
     }
     auto& frag = fragIt->second;
@@ -121,17 +125,19 @@
     frag.rtoTimer.cancel();
 
     if (frag.retxCount == 0) {
-      NFD_LOG_FACE_TRACE("received ack for txseq=" << ackSeq << ", retx=0, rtt=" <<
+      NFD_LOG_FACE_TRACE("received ack for seq=" << frag.pkt.get<lp::SequenceField>() << ", txseq=" <<
+                         ackTxSeq << ", retx=0, rtt=" <<
                          time::duration_cast<time::milliseconds>(now - frag.sendTime).count() << "ms");
       // This sequence had no retransmissions, so use it to estimate the RTO
       m_rttEst.addMeasurement(now - frag.sendTime);
     }
     else {
-      NFD_LOG_FACE_TRACE("received ack for txseq=" << ackSeq << ", retx=" << frag.retxCount);
+      NFD_LOG_FACE_TRACE("received ack for seq=" << frag.pkt.get<lp::SequenceField>() << ", txseq=" <<
+                         ackTxSeq << ", retx=" << frag.retxCount);
     }
 
-    // Look for frags with TxSequence numbers < ackSeq (allowing for wraparound) and consider them
-    // lost if a configurable number of Acks containing greater TxSequence numbers have been
+    // Look for frags with TxSequence numbers < ackTxSeq (allowing for wraparound) and consider
+    // them lost if a configurable number of Acks containing greater TxSequence numbers have been
     // received.
     auto lostLpPackets = findLostLpPackets(fragIt);
 
@@ -148,9 +154,8 @@
     // Resend or fail fragments considered lost. Potentially increment the start of the window.
     for (lp::Sequence txSeq : lostLpPackets) {
       if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
-        NFD_LOG_FACE_TRACE("txseq=" << txSeq << " considered lost from acks for more recent txseqs");
-        auto removedThisTxSeq = onLpPacketLost(txSeq);
-        for (auto removedTxSeq : removedThisTxSeq) {
+        auto removedTxSeqs = onLpPacketLost(txSeq, false);
+        for (auto removedTxSeq : removedTxSeqs) {
           removedLpPackets.insert(removedTxSeq);
         }
       }
@@ -161,8 +166,27 @@
   if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
     NFD_LOG_FACE_TRACE("queueing ack for remote txseq=" << pkt.get<lp::TxSequenceField>());
     m_ackQueue.push(pkt.get<lp::TxSequenceField>());
+
+    // Check for received frames with duplicate Sequences
+    if (pkt.has<lp::SequenceField>()) {
+      lp::Sequence pktSequence = pkt.get<lp::SequenceField>();
+      isDuplicate = m_recentRecvSeqs.count(pktSequence) > 0;
+      // Check for recent received Sequences to remove
+      auto now = time::steady_clock::now();
+      auto rto = m_rttEst.getEstimatedRto();
+      while (m_recentRecvSeqsQueue.size() > 0 &&
+             now > m_recentRecvSeqs[m_recentRecvSeqsQueue.front()] + rto) {
+        m_recentRecvSeqs.erase(m_recentRecvSeqsQueue.front());
+        m_recentRecvSeqsQueue.pop();
+      }
+      m_recentRecvSeqs.emplace(pktSequence, now);
+      m_recentRecvSeqsQueue.push(pktSequence);
+    }
+
     startIdleAckTimer();
   }
+
+  return !isDuplicate;
 }
 
 void
@@ -179,7 +203,7 @@
   remainingSpace -= pktSize;
 
   while (!m_ackQueue.empty()) {
-    lp::Sequence ackSeq = m_ackQueue.front();
+    lp::Sequence ackTxSeq = m_ackQueue.front();
     // Ack size = Ack TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
     const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
                             tlv::sizeOfVarNumber(sizeof(lp::Sequence)) +
@@ -189,9 +213,9 @@
       break;
     }
 
-    NFD_LOG_FACE_TRACE("piggybacking ack for remote txseq=" << ackSeq);
+    NFD_LOG_FACE_TRACE("piggybacking ack for remote txseq=" << ackTxSeq);
 
-    pkt.add<lp::AckField>(ackSeq);
+    pkt.add<lp::AckField>(ackTxSeq);
     m_ackQueue.pop();
     remainingSpace -= ackSize;
   }
@@ -239,8 +263,8 @@
 
     auto& unackedFrag = it->second;
     unackedFrag.nGreaterSeqAcks++;
-    NFD_LOG_FACE_TRACE("received ack=" << ackIt->first << ", out-of-order for txseq=" <<
-                       it->first << ", out-of-order count=" << unackedFrag.nGreaterSeqAcks);
+    NFD_LOG_FACE_TRACE("received ack=" << ackIt->first << " before=" << it->first <<
+                       ", before count=" << unackedFrag.nGreaterSeqAcks);
 
     if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
       lostLpPackets.push_back(it->first);
@@ -251,7 +275,7 @@
 }
 
 std::vector<lp::Sequence>
-LpReliability::onLpPacketLost(lp::Sequence txSeq)
+LpReliability::onLpPacketLost(lp::Sequence txSeq, bool isTimeout)
 {
   BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
   auto txSeqIt = m_unackedFrags.find(txSeq);
@@ -260,10 +284,19 @@
   txFrag.rtoTimer.cancel();
   auto netPkt = txFrag.netPkt;
   std::vector<lp::Sequence> removedThisTxSeq;
+  lp::Sequence seq = txFrag.pkt.get<lp::SequenceField>();
+
+  if (isTimeout) {
+    NFD_LOG_FACE_TRACE("rto timer expired for seq=" << seq << ", txseq=" << txSeq);
+  }
+  else { // lost due to out-of-order TxSeqs
+    NFD_LOG_FACE_TRACE("seq=" << seq << ", txseq=" << txSeq <<
+                       " considered lost from acks for more recent txseqs");
+  }
 
   // Check if maximum number of retransmissions exceeded
   if (txFrag.retxCount >= m_options.maxRetx) {
-    NFD_LOG_FACE_DEBUG("txseq=" << txSeq << " exceeded allowed retransmissions: DROP");
+    NFD_LOG_FACE_DEBUG("seq=" << seq << " exceeded allowed retransmissions: DROP");
     // Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
     for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
       if (netPkt->unackedFrags[i] != txSeqIt) {
@@ -316,14 +349,13 @@
     m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt), 0);
 
     auto rto = m_rttEst.getEstimatedRto();
-    NFD_LOG_FACE_TRACE("retransmitting txseq=" << txSeq << " as " << newTxSeq << ", retx=" <<
+    NFD_LOG_FACE_TRACE("retransmitting seq=" << seq << ", txseq=" << newTxSeq << ", retx=" <<
                        txFrag.retxCount << ", rto=" <<
                        time::duration_cast<time::milliseconds>(rto).count() << "ms");
 
     // Start RTO timer for this sequence
     newTxFrag.rtoTimer = getScheduler().schedule(rto, [=] {
-      NFD_LOG_FACE_TRACE("rto timer expired for txseq=" << newTxSeq);
-      onLpPacketLost(newTxSeq);
+      onLpPacketLost(newTxSeq, true);
     });
   }