shared executor in Fetchers
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index a883118..4fbe5bc 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -45,8 +45,10 @@
, m_maxParallelFetches (parallelFetches)
, m_currentParallelFetches (0)
, m_scheduler (new Scheduler)
+ , m_executor (new Executor(1))
{
m_scheduler->start ();
+ m_executor->start();
m_scheduleFetchesTask = Scheduler::schedulePeriodicTask (m_scheduler,
make_shared<SimpleIntervalGenerator> (300), // no need to check to often. if needed, will be rescheduled
@@ -57,6 +59,8 @@
{
m_scheduler->shutdown ();
+ m_executor->shutdown();
+
m_fetchList.clear_and_dispose (fetcher_disposer ());
}
@@ -76,6 +80,7 @@
forwardingHint = m_mapping (deviceName);
Fetcher &fetcher = *(new Fetcher (m_ccnx,
+ m_executor,
segmentCallback,
finishCallback,
bind (&FetchManager::DidFetchComplete, this, _1),
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 329b289..793c5c6 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -29,6 +29,7 @@
#include <list>
#include <stdint.h>
#include "scheduler.h"
+#include "executor.h"
#include "ccnx-wrapper.h"
#include "fetcher.h"
@@ -87,6 +88,7 @@
FetchList m_fetchList;
SchedulerPtr m_scheduler;
+ ExecutorPtr m_executor;
TaskPtr m_scheduleFetchesTask;
};
diff --git a/src/fetcher.cc b/src/fetcher.cc
index fa8737e..f20ae44 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -36,6 +36,7 @@
using namespace Ccnx;
Fetcher::Fetcher (Ccnx::CcnxWrapperPtr ccnx,
+ ExecutorPtr executor,
const SegmentCallback &segmentCallback,
const FinishCallback &finishCallback,
OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
@@ -64,14 +65,12 @@
, m_activePipeline (0)
, m_retryPause (0)
, m_nextScheduledRetry (date_time::second_clock<boost::posix_time::ptime>::universal_time ())
- , m_executor (1) // must be 1
+ , m_executor (executor) // must be 1
{
- m_executor.start ();
}
Fetcher::~Fetcher ()
{
- m_executor.shutdown ();
}
void
@@ -82,7 +81,7 @@
// cout << "Restart: " << m_minSendSeqNo << endl;
m_lastPositiveActivity = date_time::second_clock<boost::posix_time::ptime>::universal_time();
- m_executor.execute (bind (&Fetcher::FillPipeline, this));
+ m_executor->execute (bind (&Fetcher::FillPipeline, this));
}
void
@@ -120,7 +119,7 @@
void
Fetcher::OnData (uint64_t seqno, const Ccnx::Name &name, PcoPtr data)
{
- m_executor.execute (bind (&Fetcher::OnData_Execute, this, seqno, name, data));
+ m_executor->execute (bind (&Fetcher::OnData_Execute, this, seqno, name, data));
}
void
@@ -198,14 +197,14 @@
}
else
{
- m_executor.execute (bind (&Fetcher::FillPipeline, this));
+ m_executor->execute (bind (&Fetcher::FillPipeline, this));
}
}
void
Fetcher::OnTimeout (uint64_t seqno, const Ccnx::Name &name, const Closure &closure, Selectors selectors)
{
- m_executor.execute (bind (&Fetcher::OnTimeout_Execute, this, seqno, name, closure, selectors));
+ m_executor->execute (bind (&Fetcher::OnTimeout_Execute, this, seqno, name, closure, selectors));
}
void
diff --git a/src/fetcher.h b/src/fetcher.h
index 05e9f54..8e6b441 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -40,6 +40,7 @@
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
Fetcher (Ccnx::CcnxWrapperPtr ccnx,
+ ExecutorPtr executor,
const SegmentCallback &segmentCallback, // callback passed by caller of FetchManager
const FinishCallback &finishCallback, // callback passed by caller of FetchManager
OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed, // callbacks provided by FetchManager
@@ -131,7 +132,7 @@
double m_retryPause; // pause to stop trying to fetch (for fetch-manager)
boost::posix_time::ptime m_nextScheduledRetry;
- Executor m_executor; // to serialize FillPipeline events
+ ExecutorPtr m_executor; // to serialize FillPipeline events
};
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
diff --git a/test/test-fetch-manager.cc b/test/test-fetch-manager.cc
index a526c10..24ca59c 100644
--- a/test/test-fetch-manager.cc
+++ b/test/test-fetch-manager.cc
@@ -110,8 +110,10 @@
ccnx->publishData (Name (baseName)(oneMore), reinterpret_cast<const unsigned char*> (&oneMore), sizeof(int), 30);
FetcherTestData data;
+ ExecutorPtr executor = make_shared<Executor>(1);
Fetcher fetcher (ccnx,
+ executor,
bind (&FetcherTestData::onData, &data, _1, _2, _3, _4),
bind (&FetcherTestData::finish, &data, _1, _2),
bind (&FetcherTestData::onComplete, &data, _1),