Add slow start and back-off timeout in Fetcher

Change-Id: I0b1698f118819f32bd93b5a4ab46cfde0bc4c03a
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 632db93..87b0898 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -62,8 +62,15 @@
   , m_minSeqNo (minSeqNo)
   , m_maxSeqNo (maxSeqNo)
 
-  , m_pipeline (6) // initial "congestion window"
+  , m_pipeline (1) // initial "congestion window"
   , m_activePipeline (0)
+
+  , m_rto(1) // for temporary congestion control, should be removed when NDN provide transport functionality.
+  , m_maxRto(static_cast<double>(timeout.total_seconds())/2)
+  , m_slowStart(true)
+  , m_threshold(4)
+  , m_roundCount(0)
+
   , m_retryPause (0)
   , m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
   , m_executor (executor) // must be 1
@@ -112,7 +119,7 @@
       m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
                             Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
                                      bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1, _2, _3)),
-                            Selectors().interestLifetime (1)); // Alex: this lifetime should be changed to RTO
+                            Selectors().interestLifetime (m_rto)); // Alex: this lifetime should be changed to RTO
       _LOG_DEBUG (" >>> i ok");
 
       m_activePipeline ++;
@@ -179,6 +186,26 @@
   m_activePipeline --;
   m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
 
+  {
+    unique_lock<mutex> lock (m_pipelineMutex);
+    if(m_slowStart){
+      m_pipeline++;
+      if(m_pipeline == m_threshold)
+        m_slowStart = false;
+    }
+    else{
+      m_roundCount++;
+      _LOG_DEBUG ("roundCount: " << m_roundCount);
+      if(m_roundCount == m_pipeline){
+        m_pipeline++;
+        m_roundCount = 0;
+      }
+    }
+  }
+
+  _LOG_DEBUG ("slowStart: " << boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
+
+
   ////////////////////////////////////////////////////////////////////////////
   unique_lock<mutex> lock (m_seqNoMutex);
 
@@ -283,6 +310,29 @@
   else
     {
       _LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
-      m_ccnx->sendInterest (name, closure, selectors);
+      {
+        unique_lock<mutex> lock (m_rtoMutex);
+        _LOG_DEBUG ("Interest Timeout");
+        m_rto = m_rto * 2;
+        if(m_rto > m_maxRto)
+          {
+            m_rto = m_maxRto;
+            _LOG_DEBUG ("RTO is max: " << m_rto);
+          }
+        else
+          {
+            _LOG_DEBUG ("RTO is doubled: " << m_rto);
+          }
+      }
+
+      {
+        unique_lock<mutex> lock (m_pipelineMutex);
+        m_threshold = m_pipeline / 2;
+        m_pipeline = 1;
+        m_roundCount = 0;
+        m_slowStart = true;
+      }
+
+      m_ccnx->sendInterest (name, closure, selectors.interestLifetime (m_rto));
     }
 }
diff --git a/src/fetcher.h b/src/fetcher.h
index 515b2e9..1bc5d03 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -133,6 +133,11 @@
 
   uint32_t m_pipeline;
   uint32_t m_activePipeline;
+  double m_rto;
+  double m_maxRto;
+  bool m_slowStart;
+  uint32_t m_threshold;
+  uint32_t m_roundCount;
 
   boost::posix_time::ptime m_lastPositiveActivity;
 
@@ -142,6 +147,8 @@
   ExecutorPtr m_executor; // to serialize FillPipeline events
 
   boost::mutex m_seqNoMutex;
+  boost::mutex m_rtoMutex;
+  boost::mutex m_pipelineMutex;
 };
 
 typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;