Add slow start and back-off timeout in Fetcher

Change-Id: I0b1698f118819f32bd93b5a4ab46cfde0bc4c03a
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 632db93..87b0898 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -62,8 +62,15 @@
   , m_minSeqNo (minSeqNo)
   , m_maxSeqNo (maxSeqNo)
 
-  , m_pipeline (6) // initial "congestion window"
+  , m_pipeline (1) // initial "congestion window"
   , m_activePipeline (0)
+
+  , m_rto(1) // for temporary congestion control, should be removed when NDN provide transport functionality.
+  , m_maxRto(static_cast<double>(timeout.total_seconds())/2)
+  , m_slowStart(true)
+  , m_threshold(4)
+  , m_roundCount(0)
+
   , m_retryPause (0)
   , m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
   , m_executor (executor) // must be 1
@@ -112,7 +119,7 @@
       m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
                             Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
                                      bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1, _2, _3)),
-                            Selectors().interestLifetime (1)); // Alex: this lifetime should be changed to RTO
+                            Selectors().interestLifetime (m_rto)); // Alex: this lifetime should be changed to RTO
       _LOG_DEBUG (" >>> i ok");
 
       m_activePipeline ++;
@@ -179,6 +186,26 @@
   m_activePipeline --;
   m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
 
+  {
+    unique_lock<mutex> lock (m_pipelineMutex);
+    if(m_slowStart){
+      m_pipeline++;
+      if(m_pipeline == m_threshold)
+        m_slowStart = false;
+    }
+    else{
+      m_roundCount++;
+      _LOG_DEBUG ("roundCount: " << m_roundCount);
+      if(m_roundCount == m_pipeline){
+        m_pipeline++;
+        m_roundCount = 0;
+      }
+    }
+  }
+
+  _LOG_DEBUG ("slowStart: " << boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
+
+
   ////////////////////////////////////////////////////////////////////////////
   unique_lock<mutex> lock (m_seqNoMutex);
 
@@ -283,6 +310,29 @@
   else
     {
       _LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
-      m_ccnx->sendInterest (name, closure, selectors);
+      {
+        unique_lock<mutex> lock (m_rtoMutex);
+        _LOG_DEBUG ("Interest Timeout");
+        m_rto = m_rto * 2;
+        if(m_rto > m_maxRto)
+          {
+            m_rto = m_maxRto;
+            _LOG_DEBUG ("RTO is max: " << m_rto);
+          }
+        else
+          {
+            _LOG_DEBUG ("RTO is doubled: " << m_rto);
+          }
+      }
+
+      {
+        unique_lock<mutex> lock (m_pipelineMutex);
+        m_threshold = m_pipeline / 2;
+        m_pipeline = 1;
+        m_roundCount = 0;
+        m_slowStart = true;
+      }
+
+      m_ccnx->sendInterest (name, closure, selectors.interestLifetime (m_rto));
     }
 }
diff --git a/src/fetcher.h b/src/fetcher.h
index 515b2e9..1bc5d03 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -133,6 +133,11 @@
 
   uint32_t m_pipeline;
   uint32_t m_activePipeline;
+  double m_rto;
+  double m_maxRto;
+  bool m_slowStart;
+  uint32_t m_threshold;
+  uint32_t m_roundCount;
 
   boost::posix_time::ptime m_lastPositiveActivity;
 
@@ -142,6 +147,8 @@
   ExecutorPtr m_executor; // to serialize FillPipeline events
 
   boost::mutex m_seqNoMutex;
+  boost::mutex m_rtoMutex;
+  boost::mutex m_pipelineMutex;
 };
 
 typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
diff --git a/test/test-fetch-manager.cc b/test/test-fetch-manager.cc
index 42d4b48..ed604f3 100644
--- a/test/test-fetch-manager.cc
+++ b/test/test-fetch-manager.cc
@@ -24,8 +24,10 @@
 #include "ccnx-wrapper.h"
 #include <boost/test/unit_test.hpp>
 #include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
 #include "logging.h"
 
+
 INIT_LOGGER ("Test.FetchManager");
 
 using namespace Ccnx;
@@ -92,6 +94,26 @@
   }
 };
 
+void run()
+{
+  CcnxWrapperPtr ccnx = make_shared<CcnxWrapper> ();
+
+  Name baseName ("/base");
+  Name deviceName ("/device");
+
+  for (int i = 0; i < 10; i++)
+    {
+      usleep(100000);
+      ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
+    }
+
+  for (int i = 11; i < 50; i++)
+    {
+      usleep(100000);
+      ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
+    }
+
+}
 
 BOOST_AUTO_TEST_CASE (TestFetcher)
 {
@@ -197,10 +219,8 @@
 
   Name baseName ("/base");
   Name deviceName ("/device");
-  int i = 0;
 
-  ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
-
+  thread publishThread(run);
 
   FetcherTestData data;
   ExecutorPtr executor = make_shared<Executor>(1);
@@ -212,14 +232,14 @@
                    bind (&FetcherTestData::finish, &data, _1, _2),
                    bind (&FetcherTestData::onComplete, &data, _1),
                    bind (&FetcherTestData::onFail, &data, _1),
-                   deviceName, Name ("/base"), 1, 1,
+                   deviceName, baseName, 0, 49,
                    boost::posix_time::seconds (5)); // this time is not precise
 
   BOOST_CHECK_EQUAL (fetcher.IsActive (), false);
   fetcher.RestartPipeline ();
   BOOST_CHECK_EQUAL (fetcher.IsActive (), true);
 
-  usleep(7000000);
+  usleep(20000000);
   BOOST_CHECK_EQUAL (data.m_failed, true);
 
   executor->shutdown ();