Add slow start and back-off timeout in Fetcher
Change-Id: I0b1698f118819f32bd93b5a4ab46cfde0bc4c03a
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 632db93..87b0898 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -62,8 +62,15 @@
, m_minSeqNo (minSeqNo)
, m_maxSeqNo (maxSeqNo)
- , m_pipeline (6) // initial "congestion window"
+ , m_pipeline (1) // initial "congestion window"
, m_activePipeline (0)
+
+ , m_rto(1) // for temporary congestion control, should be removed when NDN provide transport functionality.
+ , m_maxRto(static_cast<double>(timeout.total_seconds())/2)
+ , m_slowStart(true)
+ , m_threshold(4)
+ , m_roundCount(0)
+
, m_retryPause (0)
, m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
, m_executor (executor) // must be 1
@@ -112,7 +119,7 @@
m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1, _2, _3)),
- Selectors().interestLifetime (1)); // Alex: this lifetime should be changed to RTO
+ Selectors().interestLifetime (m_rto)); // Alex: this lifetime should be changed to RTO
_LOG_DEBUG (" >>> i ok");
m_activePipeline ++;
@@ -179,6 +186,26 @@
m_activePipeline --;
m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
+ {
+ unique_lock<mutex> lock (m_pipelineMutex);
+ if(m_slowStart){
+ m_pipeline++;
+ if(m_pipeline == m_threshold)
+ m_slowStart = false;
+ }
+ else{
+ m_roundCount++;
+ _LOG_DEBUG ("roundCount: " << m_roundCount);
+ if(m_roundCount == m_pipeline){
+ m_pipeline++;
+ m_roundCount = 0;
+ }
+ }
+ }
+
+ _LOG_DEBUG ("slowStart: " << boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
+
+
////////////////////////////////////////////////////////////////////////////
unique_lock<mutex> lock (m_seqNoMutex);
@@ -283,6 +310,29 @@
else
{
_LOG_DEBUG ("Asking to reexpress seqno: " << seqno);
- m_ccnx->sendInterest (name, closure, selectors);
+ {
+ unique_lock<mutex> lock (m_rtoMutex);
+ _LOG_DEBUG ("Interest Timeout");
+ m_rto = m_rto * 2;
+ if(m_rto > m_maxRto)
+ {
+ m_rto = m_maxRto;
+ _LOG_DEBUG ("RTO is max: " << m_rto);
+ }
+ else
+ {
+ _LOG_DEBUG ("RTO is doubled: " << m_rto);
+ }
+ }
+
+ {
+ unique_lock<mutex> lock (m_pipelineMutex);
+ m_threshold = m_pipeline / 2;
+ m_pipeline = 1;
+ m_roundCount = 0;
+ m_slowStart = true;
+ }
+
+ m_ccnx->sendInterest (name, closure, selectors.interestLifetime (m_rto));
}
}
diff --git a/src/fetcher.h b/src/fetcher.h
index 515b2e9..1bc5d03 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -133,6 +133,11 @@
uint32_t m_pipeline;
uint32_t m_activePipeline;
+ double m_rto;
+ double m_maxRto;
+ bool m_slowStart;
+ uint32_t m_threshold;
+ uint32_t m_roundCount;
boost::posix_time::ptime m_lastPositiveActivity;
@@ -142,6 +147,8 @@
ExecutorPtr m_executor; // to serialize FillPipeline events
boost::mutex m_seqNoMutex;
+ boost::mutex m_rtoMutex;
+ boost::mutex m_pipelineMutex;
};
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
diff --git a/test/test-fetch-manager.cc b/test/test-fetch-manager.cc
index 42d4b48..ed604f3 100644
--- a/test/test-fetch-manager.cc
+++ b/test/test-fetch-manager.cc
@@ -24,8 +24,10 @@
#include "ccnx-wrapper.h"
#include <boost/test/unit_test.hpp>
#include <boost/make_shared.hpp>
+#include <boost/thread/thread.hpp>
#include "logging.h"
+
INIT_LOGGER ("Test.FetchManager");
using namespace Ccnx;
@@ -92,6 +94,26 @@
}
};
+void run()
+{
+ CcnxWrapperPtr ccnx = make_shared<CcnxWrapper> ();
+
+ Name baseName ("/base");
+ Name deviceName ("/device");
+
+ for (int i = 0; i < 10; i++)
+ {
+ usleep(100000);
+ ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
+ }
+
+ for (int i = 11; i < 50; i++)
+ {
+ usleep(100000);
+ ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
+ }
+
+}
BOOST_AUTO_TEST_CASE (TestFetcher)
{
@@ -197,10 +219,8 @@
Name baseName ("/base");
Name deviceName ("/device");
- int i = 0;
- ccnx->publishData (Name (baseName)(i), reinterpret_cast<const unsigned char*> (&i), sizeof(int), 30);
-
+ thread publishThread(run);
FetcherTestData data;
ExecutorPtr executor = make_shared<Executor>(1);
@@ -212,14 +232,14 @@
bind (&FetcherTestData::finish, &data, _1, _2),
bind (&FetcherTestData::onComplete, &data, _1),
bind (&FetcherTestData::onFail, &data, _1),
- deviceName, Name ("/base"), 1, 1,
+ deviceName, baseName, 0, 49,
boost::posix_time::seconds (5)); // this time is not precise
BOOST_CHECK_EQUAL (fetcher.IsActive (), false);
fetcher.RestartPipeline ();
BOOST_CHECK_EQUAL (fetcher.IsActive (), true);
- usleep(7000000);
+ usleep(20000000);
BOOST_CHECK_EQUAL (data.m_failed, true);
executor->shutdown ();