catchunks: switch to RttEstimatorWithStats

Refs: #4887
Change-Id: I8d49eff6ff4a7dc9ff49ffacc0744e50695128d2
diff --git a/tools/chunks/catchunks/main.cpp b/tools/chunks/catchunks/main.cpp
index c7deb5c..1263ba7 100644
--- a/tools/chunks/catchunks/main.cpp
+++ b/tools/chunks/catchunks/main.cpp
@@ -195,7 +195,7 @@
     auto discover = make_unique<DiscoverVersion>(Name(uri), face, options);
     unique_ptr<PipelineInterests> pipeline;
     unique_ptr<StatisticsCollector> statsCollector;
-    unique_ptr<RttEstimator> rttEstimator;
+    unique_ptr<RttEstimatorWithStats> rttEstimator;
     std::ofstream statsFileCwnd;
     std::ofstream statsFileRtt;
 
@@ -205,27 +205,26 @@
       pipeline = make_unique<PipelineInterestsFixed>(face, optionsPipeline);
     }
     else if (pipelineType == "aimd" || pipelineType == "cubic") {
-      RttEstimator::Options optionsRttEst;
-      optionsRttEst.alpha = rtoAlpha;
-      optionsRttEst.beta = rtoBeta;
-      optionsRttEst.k = k;
-      optionsRttEst.initialRto = 1_s;
-      optionsRttEst.minRto = time::milliseconds(minRto);
-      optionsRttEst.maxRto = time::milliseconds(maxRto);
-      optionsRttEst.rtoBackoffMultiplier = 2;
-      rttEstimator = make_unique<RttEstimator>(optionsRttEst);
-
+      auto optionsRttEst = make_shared<RttEstimatorWithStats::Options>();
+      optionsRttEst->alpha = rtoAlpha;
+      optionsRttEst->beta = rtoBeta;
+      optionsRttEst->k = k;
+      optionsRttEst->initialRto = 1_s;
+      optionsRttEst->minRto = time::milliseconds(minRto);
+      optionsRttEst->maxRto = time::milliseconds(maxRto);
+      optionsRttEst->rtoBackoffMultiplier = 2;
       if (options.isVerbose) {
         using namespace ndn::time;
         std::cerr << "RTT estimator parameters:\n"
-                  << "\tAlpha = " << optionsRttEst.alpha << "\n"
-                  << "\tBeta = " << optionsRttEst.beta << "\n"
-                  << "\tK = " << optionsRttEst.k << "\n"
-                  << "\tInitial RTO = " << duration_cast<milliseconds>(optionsRttEst.initialRto) << "\n"
-                  << "\tMin RTO = " << duration_cast<milliseconds>(optionsRttEst.minRto) << "\n"
-                  << "\tMax RTO = " << duration_cast<milliseconds>(optionsRttEst.maxRto) << "\n"
-                  << "\tBackoff multiplier = " << optionsRttEst.rtoBackoffMultiplier << "\n";
+                  << "\tAlpha = " << optionsRttEst->alpha << "\n"
+                  << "\tBeta = " << optionsRttEst->beta << "\n"
+                  << "\tK = " << optionsRttEst->k << "\n"
+                  << "\tInitial RTO = " << duration_cast<milliseconds>(optionsRttEst->initialRto) << "\n"
+                  << "\tMin RTO = " << duration_cast<milliseconds>(optionsRttEst->minRto) << "\n"
+                  << "\tMax RTO = " << duration_cast<milliseconds>(optionsRttEst->maxRto) << "\n"
+                  << "\tBackoff multiplier = " << optionsRttEst->rtoBackoffMultiplier << "\n";
       }
+      rttEstimator = make_unique<RttEstimatorWithStats>(std::move(optionsRttEst));
 
       PipelineInterestsAdaptive::Options optionsPipeline(options);
       optionsPipeline.disableCwa = disableCwa;
@@ -262,8 +261,7 @@
             return 4;
           }
         }
-        statsCollector = make_unique<StatisticsCollector>(*adaptivePipeline, *rttEstimator,
-                                                          statsFileCwnd, statsFileRtt);
+        statsCollector = make_unique<StatisticsCollector>(*adaptivePipeline, statsFileCwnd, statsFileRtt);
       }
 
       pipeline = std::move(adaptivePipeline);
diff --git a/tools/chunks/catchunks/pipeline-interests-adaptive.cpp b/tools/chunks/catchunks/pipeline-interests-adaptive.cpp
index cde5e40..8ae2162 100644
--- a/tools/chunks/catchunks/pipeline-interests-adaptive.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-adaptive.cpp
@@ -37,7 +37,8 @@
 
 constexpr double PipelineInterestsAdaptive::MIN_SSTHRESH;
 
-PipelineInterestsAdaptive::PipelineInterestsAdaptive(Face& face, RttEstimator& rttEstimator,
+PipelineInterestsAdaptive::PipelineInterestsAdaptive(Face& face,
+                                                     RttEstimatorWithStats& rttEstimator,
                                                      const Options& options)
   : PipelineInterests(face)
   , m_options(options)
@@ -285,7 +286,11 @@
       m_retxCount.count(recvSegNo) == 0) {
     auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
     BOOST_ASSERT(nExpectedSamples > 0);
-    m_rttEstimator.addMeasurement(rtt, static_cast<size_t>(nExpectedSamples), recvSegNo);
+    m_rttEstimator.addMeasurement(rtt, static_cast<size_t>(nExpectedSamples));
+    afterRttMeasurement({recvSegNo, rtt,
+                         m_rttEstimator.getSmoothedRtt(),
+                         m_rttEstimator.getRttVariation(),
+                         m_rttEstimator.getEstimatedRto()});
   }
 
   // remove the entry associated with the received segment
diff --git a/tools/chunks/catchunks/pipeline-interests-adaptive.hpp b/tools/chunks/catchunks/pipeline-interests-adaptive.hpp
index 94fadf3..143b53b 100644
--- a/tools/chunks/catchunks/pipeline-interests-adaptive.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-adaptive.hpp
@@ -40,7 +40,7 @@
 namespace ndn {
 namespace chunks {
 
-using util::RttEstimator;
+using util::RttEstimatorWithStats;
 
 class PipelineInterestsAdaptiveOptions : public Options
 {
@@ -111,7 +111,7 @@
    * Configures the pipelining service without specifying the retrieval namespace. After this
    * configuration the method run must be called to start the Pipeline.
    */
-  PipelineInterestsAdaptive(Face& face, RttEstimator& rttEstimator,
+  PipelineInterestsAdaptive(Face& face, RttEstimatorWithStats& rttEstimator,
                             const Options& options = Options());
 
   ~PipelineInterestsAdaptive() override;
@@ -124,6 +124,20 @@
    */
   signal::Signal<PipelineInterestsAdaptive, time::nanoseconds, double> afterCwndChange;
 
+  struct RttSample
+  {
+    uint64_t segNum;          ///< segment number on which this sample was taken
+    time::nanoseconds rtt;    ///< measured RTT
+    time::nanoseconds sRtt;   ///< smoothed RTT
+    time::nanoseconds rttVar; ///< RTT variation
+    time::nanoseconds rto;    ///< retransmission timeout
+  };
+
+  /**
+   * @brief Signals when a new RTT sample has been taken.
+   */
+  signal::Signal<PipelineInterestsAdaptive, RttSample> afterRttMeasurement;
+
 protected:
   DECLARE_SIGNAL_EMIT(afterCwndChange)
 
@@ -206,7 +220,7 @@
   double m_ssthresh; ///< current slow start threshold
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
-  RttEstimator& m_rttEstimator;
+  RttEstimatorWithStats& m_rttEstimator;
   Scheduler m_scheduler;
   scheduler::ScopedEventId m_checkRtoEvent;
 
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index 0292714..e32a36d 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -33,7 +33,7 @@
 namespace ndn {
 namespace chunks {
 
-PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
+PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimatorWithStats& rttEstimator,
                                              const Options& options)
   : PipelineInterestsAdaptive(face, rttEstimator, options)
 {
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.hpp b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
index ccaf08b..eaee573 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
@@ -40,7 +40,7 @@
 class PipelineInterestsAimd : public PipelineInterestsAdaptive
 {
 public:
-  PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
+  PipelineInterestsAimd(Face& face, RttEstimatorWithStats& rttEstimator,
                         const Options& options = Options());
 
 private:
diff --git a/tools/chunks/catchunks/pipeline-interests-cubic.cpp b/tools/chunks/catchunks/pipeline-interests-cubic.cpp
index 414cc8a..a68c9db 100644
--- a/tools/chunks/catchunks/pipeline-interests-cubic.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-cubic.cpp
@@ -32,7 +32,7 @@
 
 constexpr double CUBIC_C = 0.4;
 
-PipelineInterestsCubic::PipelineInterestsCubic(Face& face, RttEstimator& rttEstimator,
+PipelineInterestsCubic::PipelineInterestsCubic(Face& face, RttEstimatorWithStats& rttEstimator,
                                                const Options& options)
   : PipelineInterestsAdaptive(face, rttEstimator, options)
   , m_cubicOptions(options)
diff --git a/tools/chunks/catchunks/pipeline-interests-cubic.hpp b/tools/chunks/catchunks/pipeline-interests-cubic.hpp
index 11d3100..4f058c7 100644
--- a/tools/chunks/catchunks/pipeline-interests-cubic.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-cubic.hpp
@@ -61,7 +61,7 @@
   using Options = PipelineInterestsCubicOptions;
 
 public:
-  PipelineInterestsCubic(Face& face, RttEstimator& rttEstimator,
+  PipelineInterestsCubic(Face& face, RttEstimatorWithStats& rttEstimator,
                          const Options& options = Options());
 
 private:
diff --git a/tools/chunks/catchunks/statistics-collector.cpp b/tools/chunks/catchunks/statistics-collector.cpp
index 2a65025..91af79d 100644
--- a/tools/chunks/catchunks/statistics-collector.cpp
+++ b/tools/chunks/catchunks/statistics-collector.cpp
@@ -28,25 +28,24 @@
 namespace chunks {
 
 StatisticsCollector::StatisticsCollector(PipelineInterestsAdaptive& pipeline,
-                                         RttEstimator& rttEstimator,
                                          std::ostream& osCwnd, std::ostream& osRtt)
   : m_osCwnd(osCwnd)
   , m_osRtt(osRtt)
 {
   m_osCwnd << "time\tcwndsize\n";
   m_osRtt  << "segment\trtt\trttvar\tsrtt\trto\n";
-  pipeline.afterCwndChange.connect(
-    [this] (time::nanoseconds timeElapsed, double cwnd) {
-      m_osCwnd << timeElapsed.count() / 1e9 << '\t' << cwnd << '\n';
-    });
-  rttEstimator.afterMeasurement.connect(
-    [this] (const RttEstimator::Sample& sample) {
-      m_osRtt << *sample.segNum << '\t'
-              << sample.rtt.count() / 1e6 << '\t'
-              << sample.rttVar.count() / 1e6 << '\t'
-              << sample.sRtt.count() / 1e6 << '\t'
-              << sample.rto.count() / 1e6 << '\n';
-    });
+
+  pipeline.afterCwndChange.connect([this] (time::nanoseconds timeElapsed, double cwnd) {
+    m_osCwnd << timeElapsed.count() / 1e9 << '\t' << cwnd << '\n';
+  });
+
+  pipeline.afterRttMeasurement.connect([this] (const auto& sample) {
+    m_osRtt << sample.segNum << '\t'
+            << sample.rtt.count() / 1e6 << '\t'
+            << sample.rttVar.count() / 1e6 << '\t'
+            << sample.sRtt.count() / 1e6 << '\t'
+            << sample.rto.count() / 1e6 << '\n';
+  });
 }
 
 } // namespace chunks
diff --git a/tools/chunks/catchunks/statistics-collector.hpp b/tools/chunks/catchunks/statistics-collector.hpp
index 35a0846..bc57f00 100644
--- a/tools/chunks/catchunks/statistics-collector.hpp
+++ b/tools/chunks/catchunks/statistics-collector.hpp
@@ -36,7 +36,7 @@
 class StatisticsCollector : noncopyable
 {
 public:
-  StatisticsCollector(PipelineInterestsAdaptive& pipeline, RttEstimator& rttEstimator,
+  StatisticsCollector(PipelineInterestsAdaptive& pipeline,
                       std::ostream& osCwnd, std::ostream& osRtt);
 
 private: