Add slow start and back-off timeout in Fetcher
Change-Id: I0b1698f118819f32bd93b5a4ab46cfde0bc4c03a
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 632db93..87b0898 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -62,8 +62,15 @@
, m_minSeqNo (minSeqNo)
, m_maxSeqNo (maxSeqNo)
- , m_pipeline (6) // initial "congestion window"
+ , m_pipeline (1) // initial "congestion window"
, m_activePipeline (0)
+
+ , m_rto(1) // for temporary congestion control, should be removed when NDN provide transport functionality.
+ , m_maxRto(static_cast<double>(timeout.total_seconds())/2)
+ , m_slowStart(true)
+ , m_threshold(4)
+ , m_roundCount(0)
+
, m_retryPause (0)
, m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
, m_executor (executor) // must be 1
@@ -112,7 +119,7 @@
m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1, _2, _3)),
- Selectors().interestLifetime (1)); // Alex: this lifetime should be changed to RTO
+ Selectors().interestLifetime (m_rto)); // Alex: this lifetime should be changed to RTO
_LOG_DEBUG (" >>> i ok");
m_activePipeline ++;
@@ -179,6 +186,26 @@
m_activePipeline --;
m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
+ {
+ unique_lock<mutex> lock (m_pipelineMutex);
+ if(m_slowStart){
+ m_pipeline++;
+ if(m_pipeline == m_threshold)
+ m_slowStart = false;
+ }
+ else{
+ m_roundCount++;
+ _LOG_DEBUG ("roundCount: " << m_roundCount);
+ if(m_roundCount == m_pipeline){
+ m_pipeline++;
+ m_roundCount = 0;
+ }
+ }
+ }
+
+ _LOG_DEBUG ("slowStart: " << boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
+
+
////////////////////////////////////////////////////////////////////////////
unique_lock<mutex> lock (m_seqNoMutex);
@@ -283,6 +310,29 @@
else
{
_LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
- m_ccnx->sendInterest (name, closure, selectors);
+ {
+ unique_lock<mutex> lock (m_rtoMutex);
+ _LOG_DEBUG ("Interest Timeout");
+ m_rto = m_rto * 2;
+ if(m_rto > m_maxRto)
+ {
+ m_rto = m_maxRto;
+ _LOG_DEBUG ("RTO is max: " << m_rto);
+ }
+ else
+ {
+ _LOG_DEBUG ("RTO is doubled: " << m_rto);
+ }
+ }
+
+ {
+ unique_lock<mutex> lock (m_pipelineMutex);
+ m_threshold = m_pipeline / 2;
+ m_pipeline = 1;
+ m_roundCount = 0;
+ m_slowStart = true;
+ }
+
+ m_ccnx->sendInterest (name, closure, selectors.interestLifetime (m_rto));
}
}
diff --git a/src/fetcher.h b/src/fetcher.h
index 515b2e9..1bc5d03 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -133,6 +133,11 @@
uint32_t m_pipeline;
uint32_t m_activePipeline;
+ double m_rto;
+ double m_maxRto;
+ bool m_slowStart;
+ uint32_t m_threshold;
+ uint32_t m_roundCount;
boost::posix_time::ptime m_lastPositiveActivity;
@@ -142,6 +147,8 @@
ExecutorPtr m_executor; // to serialize FillPipeline events
boost::mutex m_seqNoMutex;
+ boost::mutex m_rtoMutex;
+ boost::mutex m_pipelineMutex;
};
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;