chunks: react to congestion marks

Change-Id: I96efe1bc3ec7a54080c76676892114f2c79003ed
refs: #4289
diff --git a/tests/chunks/pipeline-interests-aimd.t.cpp b/tests/chunks/pipeline-interests-aimd.t.cpp
index e7856c4..d235bb1 100644
--- a/tests/chunks/pipeline-interests-aimd.t.cpp
+++ b/tests/chunks/pipeline-interests-aimd.t.cpp
@@ -21,6 +21,7 @@
  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
  *
  * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
  */
 
 #include "tools/chunks/catchunks/pipeline-interests-aimd.hpp"
@@ -43,6 +44,12 @@
     , opt(makePipelineOptions())
     , rttEstimator(makeRttEstimatorOptions())
   {
+    createPipeline();
+  }
+
+  void
+  createPipeline()
+  {
     auto pline = make_unique<PipelineInterestsAimd>(face, rttEstimator, opt);
     aimdPipeline = pline.get();
     setPipeline(std::move(pline));
@@ -54,6 +61,7 @@
   {
     PipelineInterestsAimdOptions pipelineOptions;
     pipelineOptions.disableCwa = false;
+    pipelineOptions.ignoreCongMarks = false;
     pipelineOptions.resetCwndToInit = false;
     pipelineOptions.initCwnd = 1.0;
     pipelineOptions.aiStep = 1.0;
@@ -78,8 +86,11 @@
   PipelineInterestsAimdOptions opt;
   RttEstimator rttEstimator;
   PipelineInterestsAimd* aimdPipeline;
+  static constexpr double MARGIN = 0.01;
 };
 
+constexpr double PipelineInterestAimdFixture::MARGIN;
+
 BOOST_AUTO_TEST_SUITE(Chunks)
 BOOST_FIXTURE_TEST_SUITE(TestPipelineInterestsAimd, PipelineInterestAimdFixture)
 
@@ -87,7 +98,7 @@
 {
   nDataSegments = 4;
   aimdPipeline->m_ssthresh = 8.0;
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, 0.1);
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
 
   double preCwnd = aimdPipeline->m_cwnd;
   runWithData(*makeDataWithSegment(0));
@@ -97,7 +108,7 @@
   for (uint64_t i = 1; i < nDataSegments - 1; ++i) {
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1));
-    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, 1, 0.1);
+    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, 1, MARGIN);
     preCwnd = aimdPipeline->m_cwnd;
   }
 
@@ -108,7 +119,7 @@
 {
   nDataSegments = 8;
   aimdPipeline->m_ssthresh = 4.0;
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, 0.1);
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
 
   double preCwnd = aimdPipeline->m_cwnd;
   runWithData(*makeDataWithSegment(0));
@@ -121,12 +132,12 @@
     preCwnd = aimdPipeline->m_cwnd;
   }
 
-  BOOST_CHECK_CLOSE(preCwnd, aimdPipeline->m_ssthresh, 0.1);
+  BOOST_CHECK_CLOSE(preCwnd, aimdPipeline->m_ssthresh, MARGIN);
 
   for (uint64_t i = aimdPipeline->m_ssthresh; i < nDataSegments - 1; ++i) { // congestion avoidance
     face.receive(*makeDataWithSegment(i));
     advanceClocks(io, time::nanoseconds(1));
-    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, opt.aiStep / floor(aimdPipeline->m_cwnd), 0.1);
+    BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd - preCwnd, opt.aiStep / floor(aimdPipeline->m_cwnd), MARGIN);
     preCwnd = aimdPipeline->m_cwnd;
   }
 
@@ -137,7 +148,7 @@
 {
   nDataSegments = 8;
   aimdPipeline->m_ssthresh = 4.0;
-  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, 0.1);
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
 
   runWithData(*makeDataWithSegment(0));
   advanceClocks(io, time::nanoseconds(1));
@@ -150,7 +161,7 @@
   }
 
   BOOST_CHECK_EQUAL(pipeline->m_nReceived, 3);
-  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 3, 0.1);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 3, MARGIN);
   BOOST_CHECK_EQUAL(face.sentInterests.size(), 5); // request for segment 5 has been sent
 
   advanceClocks(io, time::milliseconds(100));
@@ -164,14 +175,14 @@
   advanceClocks(io, time::nanoseconds(1));
 
   BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
-  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.25, 0.1);
-  BOOST_CHECK_EQUAL(face.sentInterests.size(), 7); // all the segment requests have been sent
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.25, MARGIN);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), nDataSegments - 1); // all the segment requests have been sent
 
   // timeout segment 3
   advanceClocks(io, time::milliseconds(150));
 
   BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
-  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, 0.1); // window size drop to 1/2 of previous size
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, MARGIN); // window size drop to 1/2 of previous size
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 1);
 
   // receive segment 6, retransmit 3
@@ -179,11 +190,139 @@
   advanceClocks(io, time::nanoseconds(1));
 
   BOOST_CHECK_EQUAL(pipeline->m_nReceived, 6);
-  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.625, 0.1); // congestion avoidance
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.625, MARGIN); // congestion avoidance
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[3], 1);
 }
 
+BOOST_AUTO_TEST_CASE(CongestionMarksWithCwa)
+{
+  nDataSegments = 7;
+  aimdPipeline->m_ssthresh = 4.0;
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
+
+  runWithData(*makeDataWithSegment(0));
+  advanceClocks(io, time::nanoseconds(1));
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
+
+  // receive segments 1 to 4
+  for (uint64_t i = 1; i < 5; ++i) {
+    face.receive(*makeDataWithSegment(i));
+    advanceClocks(io, time::nanoseconds(1));
+  }
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.25, MARGIN);
+
+  // receive segment 5 with congestion mark
+  face.receive(*makeDataWithSegmentAndCongMark(5));
+  advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 6);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, MARGIN); // window size drops to 1/2 of previous size
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), nDataSegments - 1); // all interests have been sent
+
+  // receive the last segment with congestion mark
+  face.receive(*makeDataWithSegmentAndCongMark(nDataSegments - 1));
+  advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, MARGIN); // conservative window adaption (window size should not decrease)
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
+
+  // make sure no interest is retransmitted for marked data packets
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[5], 0);
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[nDataSegments - 1], 0);
+
+  // check number of received marked data packets
+  BOOST_CHECK_EQUAL(aimdPipeline->m_nCongMarks, 2);
+}
+
+BOOST_AUTO_TEST_CASE(CongestionMarksWithoutCwa)
+{
+  opt.disableCwa = true;
+  createPipeline();
+
+  nDataSegments = 7;
+  aimdPipeline->m_ssthresh = 4.0;
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
+
+  runWithData(*makeDataWithSegment(0));
+  advanceClocks(io, time::nanoseconds(1));
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
+
+  // receive segments 1 to 4
+  for (uint64_t i = 1; i < 5; ++i) {
+    face.receive(*makeDataWithSegment(i));
+    advanceClocks(io, time::nanoseconds(1));
+  }
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 5);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.25, MARGIN);
+
+  // receive segment 5 with congestion mark
+  face.receive(*makeDataWithSegmentAndCongMark(5));
+  advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 6);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 2.125, MARGIN); // window size drops to 1/2 of previous size
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), nDataSegments - 1); // all interests have been sent
+
+  // receive the last segment with congestion mark
+  face.receive(*makeDataWithSegmentAndCongMark(nDataSegments - 1));
+  advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, PipelineInterestsAimd::MIN_SSTHRESH,
+                    MARGIN); // window size should decrease, as cwa is disabled
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
+
+  // make sure no interest is retransmitted for marked data packets
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[5], 0);
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[nDataSegments - 1], 0);
+
+  // check number of received marked data packets
+  BOOST_CHECK_EQUAL(aimdPipeline->m_nCongMarks, 2);
+}
+
+BOOST_AUTO_TEST_CASE(IgnoreCongestionMarks)
+{
+  opt.ignoreCongMarks = true;
+  createPipeline();
+
+  nDataSegments = 7;
+  aimdPipeline->m_ssthresh = 4.0;
+  BOOST_REQUIRE_CLOSE(aimdPipeline->m_cwnd, 1, MARGIN);
+
+  runWithData(*makeDataWithSegment(0));
+  advanceClocks(io, time::nanoseconds(1));
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
+
+  // receive segments 1 to 5
+  for (uint64_t i = 1; i < 6; ++i) {
+    face.receive(*makeDataWithSegment(i));
+    advanceClocks(io, time::nanoseconds(1));
+  }
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, 6);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.5, MARGIN);
+  BOOST_CHECK_EQUAL(face.sentInterests.size(), nDataSegments - 1); // all interests have been sent
+
+  // receive the last segment with congestion mark
+  face.receive(*makeDataWithSegmentAndCongMark(nDataSegments - 1));
+  advanceClocks(io, time::nanoseconds(1));
+
+  BOOST_CHECK_EQUAL(pipeline->m_nReceived, nDataSegments);
+  BOOST_CHECK_CLOSE(aimdPipeline->m_cwnd, 4.75, MARGIN); // window size increases
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxQueue.size(), 0);
+
+  // make sure no interest is retransmitted for marked data packet
+  BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[nDataSegments - 1], 0);
+
+  // check number of received marked data packets
+  BOOST_CHECK_EQUAL(aimdPipeline->m_nCongMarks, 1);
+}
+
 BOOST_AUTO_TEST_CASE(Nack)
 {
   nDataSegments = 5;
diff --git a/tests/chunks/pipeline-interests-fixture.hpp b/tests/chunks/pipeline-interests-fixture.hpp
index e35aec1..a4c9e50 100644
--- a/tests/chunks/pipeline-interests-fixture.hpp
+++ b/tests/chunks/pipeline-interests-fixture.hpp
@@ -23,6 +23,7 @@
  * @author Andrea Tosatto
  * @author Davide Pesavento
  * @author Weiwei Liu
+ * @author Chavoosh Ghasemi
  */
 
 #ifndef NDN_TOOLS_TESTS_CHUNKS_PIPELINE_INTERESTS_FIXTURE_HPP
@@ -67,6 +68,15 @@
     return signData(data);
   }
 
+  shared_ptr<Data>
+  makeDataWithSegmentAndCongMark(uint64_t segmentNo, uint64_t congestionMark = 1,
+                                 bool setFinalBlockId = true) const
+  {
+    auto data = makeDataWithSegment(segmentNo, setFinalBlockId);
+    data->setCongestionMark(congestionMark);
+    return data;
+  }
+
   void
   runWithData(const Data& data)
   {
diff --git a/tools/chunks/catchunks/ndncatchunks.cpp b/tools/chunks/catchunks/ndncatchunks.cpp
index 2242a83..822194a 100644
--- a/tools/chunks/catchunks/ndncatchunks.cpp
+++ b/tools/chunks/catchunks/ndncatchunks.cpp
@@ -26,6 +26,7 @@
  * @author Davide Pesavento
  * @author Weiwei Liu
  * @author Klaus Schneider
+ * @author Chavoosh Ghasemi
  */
 
 #include "aimd-statistics-collector.hpp"
@@ -58,7 +59,7 @@
 
   // congestion control parameters, CWA refers to conservative window adaptation,
   // i.e. only reduce window size at most once per RTT
-  bool disableCwa(false), resetCwndToInit(false);
+  bool disableCwa(false), resetCwndToInit(false), ignoreCongMarks(false);
   double aiStep(1.0), mdCoef(0.5), alpha(0.125), beta(0.25),
          minRto(200.0), maxRto(4000.0);
   int initCwnd(1), initSsthresh(std::numeric_limits<int>::max()), k(4);
@@ -104,7 +105,11 @@
                        "log file for AIMD rtt statistics")
     ("aimd-disable-cwa", po::bool_switch(&disableCwa),
                          "disable Conservative Window Adaptation, "
-                         "i.e. reduce window on each timeout (instead of at most once per RTT)")
+                         "i.e. reduce window on each congestion event (timeout or congestion mark) "
+                         "instead of at most once per RTT")
+    ("aimd-ignore-cong-marks",  po::bool_switch(&ignoreCongMarks),
+                                "disable reaction to congestion marks, "
+                                "the default is to decrease the window after receiving a congestion mark")
     ("aimd-reset-cwnd-to-init", po::bool_switch(&resetCwndToInit),
                                 "reset cwnd to initial cwnd when loss event occurs, default is "
                                 "resetting to ssthresh")
@@ -253,6 +258,7 @@
       optionsPipeline.initSsthresh = static_cast<double>(initSsthresh);
       optionsPipeline.aiStep = aiStep;
       optionsPipeline.mdCoef = mdCoef;
+      optionsPipeline.ignoreCongMarks = ignoreCongMarks;
 
       auto aimdPipeline = make_unique<PipelineInterestsAimd>(face, *rttEstimator, optionsPipeline);
 
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index b979a51..61ed8f3 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -33,6 +33,8 @@
 namespace chunks {
 namespace aimd {
 
+constexpr double PipelineInterestsAimd::MIN_SSTHRESH;
+
 PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
                                              const Options& options)
   : PipelineInterests(face)
@@ -46,6 +48,7 @@
   , m_nInFlight(0)
   , m_nLossEvents(0)
   , m_nRetransmitted(0)
+  , m_nCongMarks(0)
   , m_cwnd(m_options.initCwnd)
   , m_ssthresh(m_options.initSsthresh)
   , m_hasFailure(false)
@@ -244,7 +247,30 @@
     m_nInFlight--;
   }
 
-  increaseWindow();
+  // upon finding congestion mark, decrease the window size
+  // without retransmitting any packet
+  if (data.getCongestionMark() > 0) {
+    m_nCongMarks++;
+    if (!m_options.ignoreCongMarks) {
+      if (m_options.disableCwa || m_highData > m_recPoint) {
+        m_recPoint = m_highInterest;  // react to only one congestion event (timeout or congestion mark)
+                                      // per RTT (conservative window adaptation)
+        decreaseWindow();
+
+        if (m_options.isVerbose) {
+          std::cerr << "Received congestion mark, value = " << data.getCongestionMark()
+                    << ", new cwnd = " << m_cwnd << std::endl;
+        }
+      }
+    }
+    else {
+      increaseWindow();
+    }
+  }
+  else {
+    increaseWindow();
+  }
+
   onData(data);
 
   if (segInfo.state == SegmentState::FirstTimeSent ||
@@ -324,7 +350,7 @@
     m_nLossEvents++;
 
     if (m_options.isVerbose) {
-      std::cerr << "Packet loss event, cwnd = " << m_cwnd
+      std::cerr << "Packet loss event, new cwnd = " << m_cwnd
                 << ", ssthresh = " << m_ssthresh << std::endl;
     }
   }
@@ -382,7 +408,7 @@
 PipelineInterestsAimd::decreaseWindow()
 {
   // please refer to RFC 5681, Section 3.1 for the rationale behind it
-  m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
+  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
   m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
 
   afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
@@ -411,7 +437,8 @@
   std::cerr << "Total # of packet loss events: " << m_nLossEvents << "\n"
             << "Packet loss rate: "
             << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
-            << "Total # of retransmitted segments: " << m_nRetransmitted << "\n";
+            << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
+            << "Total # of received congestion marks: " << m_nCongMarks << "\n";
 }
 
 std::ostream&
@@ -444,6 +471,7 @@
      << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
      << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
      << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
+     << "\tReaction to congestion marks " << (options.ignoreCongMarks ? "disabled" : "enabled") << "\n"
      << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
      << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
   return os;
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.hpp b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
index 5aedbfb..bbc9940 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
@@ -56,6 +56,7 @@
   time::milliseconds rtoCheckInterval{10}; ///< interval for checking retransmission timer
   bool disableCwa = false; ///< disable Conservative Window Adaptation
   bool resetCwndToInit = false; ///< reduce cwnd to initCwnd when loss event occurs
+  bool ignoreCongMarks = false; ///< disable window decrease after congestion marks
 };
 
 /**
@@ -187,6 +188,7 @@
   printSummary() const final;
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+  static constexpr double MIN_SSTHRESH = 2.0;
   const Options m_options;
   RttEstimator& m_rttEstimator;
   Scheduler m_scheduler;
@@ -200,6 +202,7 @@
   int64_t m_nInFlight; ///< # of segments in flight
   int64_t m_nLossEvents; ///< # of loss events occurred
   int64_t m_nRetransmitted; ///< # of segments retransmitted
+  int64_t m_nCongMarks; ///< # of data packets with congestion mark
 
   double m_cwnd; ///< current congestion window size (in segments)
   double m_ssthresh; ///< current slow start threshold