Getting closer with FetchManager
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index b0154ea..931d2c1 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -28,9 +28,15 @@
using namespace std;
using namespace Ccnx;
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync)
+//The disposer object function
+struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
+
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches/* = 3*/)
: m_ccnx (ccnx)
, m_sync (sync)
+ , m_maxParallelFetches (parallelFetches)
+ , m_currentParallelFetches (0)
+
{
m_scheduler = make_shared<Scheduler> ();
m_scheduler->start ();
@@ -38,6 +44,8 @@
FetchManager::~FetchManager ()
{
+ m_fetchList.clear_and_dispose (fetcher_disposer ());
+
m_scheduler->shutdown ();
m_scheduler.reset ();
}
@@ -45,16 +53,65 @@
void
FetchManager::Enqueue (const Ccnx::Name &deviceName, uint32_t minSeqNo, uint32_t maxSeqNo, int priority/*=PRIORITY_NORMAL*/)
{
+ // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
+ Name forwardingHint = m_sync->LookupLocator (deviceName);
+ Fetcher &fetcher = *(new Fetcher (*this, deviceName, minSeqNo, maxSeqNo, forwardingHint));
+
+ switch (priority)
+ {
+ case PRIORITY_HIGH:
+ m_fetchList.push_front (fetcher);
+ break;
+
+ case PRIORITY_NORMAL:
+ default:
+ m_fetchList.push_back (fetcher);
+ break;
+ }
+
+ ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
}
-Ccnx::CcnxWrapperPtr
-FetchManager::GetCcnx ()
+void
+FetchManager::ScheduleFetches ()
{
- return m_ccnx;
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+
+ for (FetchList::iterator item = m_fetchList.begin ();
+ m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end ();
+ item++)
+ {
+ if (item->m_active)
+ continue;
+
+ m_currentParallelFetches ++;
+ item->RestartPipeline ();
+ }
}
-SchedulerPtr
-FetchManager::GetScheduler ()
+void
+FetchManager::NoDataTimeout (Fetcher &fetcher)
{
- return m_scheduler;
+ fetcher.m_forwardingHint = Ccnx::Name ("/ndn/broadcast");
+ fetcher.m_active = false;
+ {
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+ m_currentParallelFetches --;
+ // no need to do anything with the m_fetchList
+ }
+
+ ScheduleFetches ();
+}
+
+void
+FetchManager::FetchComplete (Fetcher &fetcher)
+{
+ fetcher.m_active = false;
+ {
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+ m_currentParallelFetches --;
+ m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+ }
+
+ // ? do something else
}