Adding executor to fetcher: there was a deadlock otherwise
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 3b5f181..d74f210 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -25,7 +25,7 @@
#include <boost/throw_exception.hpp>
#include "logging.h"
-INIT_LOGGER ("Fetch.Manager");
+INIT_LOGGER ("FetchManager");
using namespace boost;
using namespace std;
@@ -100,6 +100,8 @@
if (item->IsActive ())
continue;
+ _LOG_DEBUG ("Start fetching of " << item->GetName ());
+
m_currentParallelFetches ++;
item->RestartPipeline ();
}
@@ -108,6 +110,8 @@
void
FetchManager::DidNoDataTimeout (Fetcher &fetcher)
{
+ _LOG_DEBUG ("No data timeout for " << fetcher.GetName () << " with forwarding hint: " << fetcher.GetForwardingHint ());
+
fetcher.SetForwardingHint (Ccnx::Name (BROADCAST_DOMAIN));
{
unique_lock<mutex> lock (m_parellelFetchMutex);
diff --git a/src/fetcher.cc b/src/fetcher.cc
index e9dec91..3fe7c63 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -62,11 +62,15 @@
, m_pipeline (6) // initial "congestion window"
, m_activePipeline (0)
+
+ , m_executor (1)
{
+ m_executor.start ();
}
Fetcher::~Fetcher ()
{
+ m_executor.shutdown ();
}
void
@@ -77,7 +81,7 @@
// cout << "Restart: " << m_minSendSeqNo << endl;
m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
- FillPipeline ();
+ m_executor.execute (bind (&Fetcher::FillPipeline, this));
}
void
@@ -101,6 +105,7 @@
Closure (bind(&Fetcher::OnData, this, m_minSendSeqNo+1, _1, _2),
bind(&Fetcher::OnTimeout, this, m_minSendSeqNo+1, _1)),
Selectors().interestLifetime (1)); // Alex: this lifetime should be changed to RTO
+ _LOG_DEBUG (" >>> i ok");
m_activePipeline ++;
}
@@ -170,13 +175,15 @@
}
else
{
- FillPipeline ();
+ m_executor.execute (bind (&Fetcher::FillPipeline, this));
}
}
Closure::TimeoutCallbackReturnValue
Fetcher::OnTimeout (uint64_t seqno, const Ccnx::Name &name)
{
+ _LOG_DEBUG (" <<< :( timeout " << name.getPartialName (0, name.size () - 1) << ", seq = " << seqno);
+
// cout << "Fetcher::OnTimeout: " << name << endl;
// cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
// << ", now: " << date_time::second_clock<boost::posix_time::ptime>::universal_time()
@@ -188,6 +195,7 @@
m_activePipeline --;
if (m_activePipeline == 0)
{
+ _LOG_DEBUG ("Telling that fetch failed");
m_active = false;
m_onFetchFailed (*this);
// this is not valid anymore, but we still should be able finish work
@@ -195,5 +203,8 @@
return Closure::RESULT_OK;
}
else
- return Closure::RESULT_REEXPRESS;
+ {
+ _LOG_DEBUG ("Asking to reexpress");
+ return Closure::RESULT_REEXPRESS;
+ }
}
diff --git a/src/fetcher.h b/src/fetcher.h
index c86061f..1be6a27 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -25,7 +25,7 @@
#include "ccnx-wrapper.h"
#include "ccnx-name.h"
-#include "scheduler.h"
+#include "executor.h"
#include <boost/intrusive/list.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
@@ -58,6 +58,13 @@
void
SetForwardingHint (const Ccnx::Name &forwardingHint);
+ const Ccnx::Name &
+ GetForwardingHint () const { return m_forwardingHint; }
+
+ const Ccnx::Name &
+ GetName () const { return m_name; }
+
+
private:
void
FillPipeline ();
@@ -99,7 +106,7 @@
uint32_t m_activePipeline;
boost::posix_time::ptime m_lastPositiveActivity;
-
+ Executor m_executor;
};
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;