Getting closer with FetchManager
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index b0154ea..931d2c1 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -28,9 +28,15 @@
using namespace std;
using namespace Ccnx;
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync)
+//The disposer object function
+struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
+
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches/* = 3*/)
: m_ccnx (ccnx)
, m_sync (sync)
+ , m_maxParallelFetches (parallelFetches)
+ , m_currentParallelFetches (0)
+
{
m_scheduler = make_shared<Scheduler> ();
m_scheduler->start ();
@@ -38,6 +44,8 @@
FetchManager::~FetchManager ()
{
+ m_fetchList.clear_and_dispose (fetcher_disposer ());
+
m_scheduler->shutdown ();
m_scheduler.reset ();
}
@@ -45,16 +53,65 @@
void
FetchManager::Enqueue (const Ccnx::Name &deviceName, uint32_t minSeqNo, uint32_t maxSeqNo, int priority/*=PRIORITY_NORMAL*/)
{
+ // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
+ Name forwardingHint = m_sync->LookupLocator (deviceName);
+ Fetcher &fetcher = *(new Fetcher (*this, deviceName, minSeqNo, maxSeqNo, forwardingHint));
+
+ switch (priority)
+ {
+ case PRIORITY_HIGH:
+ m_fetchList.push_front (fetcher);
+ break;
+
+ case PRIORITY_NORMAL:
+ default:
+ m_fetchList.push_back (fetcher);
+ break;
+ }
+
+ ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
}
-Ccnx::CcnxWrapperPtr
-FetchManager::GetCcnx ()
+void
+FetchManager::ScheduleFetches ()
{
- return m_ccnx;
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+
+ for (FetchList::iterator item = m_fetchList.begin ();
+ m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end ();
+ item++)
+ {
+ if (item->m_active)
+ continue;
+
+ m_currentParallelFetches ++;
+ item->RestartPipeline ();
+ }
}
-SchedulerPtr
-FetchManager::GetScheduler ()
+void
+FetchManager::NoDataTimeout (Fetcher &fetcher)
{
- return m_scheduler;
+ fetcher.m_forwardingHint = Ccnx::Name ("/ndn/broadcast");
+ fetcher.m_active = false;
+ {
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+ m_currentParallelFetches --;
+ // no need to do anything with the m_fetchList
+ }
+
+ ScheduleFetches ();
+}
+
+void
+FetchManager::FetchComplete (Fetcher &fetcher)
+{
+ fetcher.m_active = false;
+ {
+ unique_lock<mutex> lock (m_parellelFetchMutex);
+ m_currentParallelFetches --;
+ m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+ }
+
+ // ? do something else
}
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index eb38551..b200318 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -25,6 +25,7 @@
#include <boost/exception/all.hpp>
#include <boost/shared_ptr.hpp>
#include <string>
+#include <list>
#include <stdint.h>
#include "scheduler.h"
@@ -32,6 +33,8 @@
#include "ccnx-tunnel.h"
#include "sync-log.h"
+#include "fetcher.h"
+
class FetchManager
{
enum
@@ -41,29 +44,64 @@
};
public:
- FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync);
+ FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches = 3);
virtual ~FetchManager ();
void
Enqueue (const Ccnx::Name &deviceName,
uint32_t minSeqNo, uint32_t maxSeqNo, int priority=PRIORITY_NORMAL);
- Ccnx::CcnxWrapperPtr
+ inline Ccnx::CcnxWrapperPtr
GetCcnx ();
- SchedulerPtr
+ inline SchedulerPtr
GetScheduler ();
+
+private:
+ void
+ ScheduleFetches ();
+
+ // Events called from Fetcher
+ void
+ NoDataTimeout (Fetcher &fetcher);
+
+ void
+ FetchComplete (Fetcher &fetcher);
+
+private:
private:
Ccnx::CcnxWrapperPtr m_ccnx;
SyncLogPtr m_sync; // to access forwarding hints
SchedulerPtr m_scheduler;
+
+ uint32_t m_maxParallelFetches;
+ uint32_t m_currentParallelFetches;
+ boost::mutex m_parellelFetchMutex;
+
+ // optimized list structure for fetch queue
+ typedef boost::intrusive::member_hook< Fetcher,
+ boost::intrusive::list_member_hook<>, &Fetcher::m_managerListHook> MemberOption;
+ typedef boost::intrusive::list<Fetcher, MemberOption> FetchList;
+
+ FetchList m_fetchList;
};
+Ccnx::CcnxWrapperPtr
+FetchManager::GetCcnx ()
+{
+ return m_ccnx;
+}
+
+SchedulerPtr
+FetchManager::GetScheduler ()
+{
+ return m_scheduler;
+}
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
namespace Error {
-struct Fetcher : virtual boost::exception, virtual std::exception { };
+struct FetchManager : virtual boost::exception, virtual std::exception { };
}
typedef boost::shared_ptr<FetchManager> FetchManagerPtr;
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 1cb07d9..c015407 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -31,6 +31,7 @@
const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
const Ccnx::Name &forwardingHint/* = Ccnx::Name ()*/)
: m_fetchManager (fetchManger)
+ , m_active (false)
, m_name (name)
, m_forwardingHint (forwardingHint)
, m_minSendSeqNo (-1)
@@ -45,3 +46,9 @@
Fetcher::~Fetcher ()
{
}
+
+void
+Fetcher::RestartPipeline ()
+{
+ m_active = true;
+}
diff --git a/src/fetcher.h b/src/fetcher.h
index 46faf2e..5cb3977 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -24,10 +24,11 @@
#include "ccnx-wrapper.h"
#include "scheduler.h"
+#include <boost/intrusive/list.hpp>
class FetchManager;
-class Fetcher
+class Fetcher
{
public:
Fetcher (FetchManager &fetchManger,
@@ -37,6 +38,9 @@
private:
void
+ RestartPipeline ();
+
+ void
OnData ();
void
@@ -44,6 +48,7 @@
private:
FetchManager &m_fetchManager;
+ bool m_active;
Ccnx::Name m_name;
Ccnx::Name m_forwardingHint;
@@ -54,6 +59,9 @@
int32_t m_maxSeqNo;
uint32_t m_pipeline;
+
+ boost::intrusive::list_member_hook<> m_managerListHook;
+ friend class FetchManager;
};
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;