Getting closer with FetchManager
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index b0154ea..931d2c1 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -28,9 +28,15 @@
 using namespace std;
 using namespace Ccnx;
 
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync)
+//The disposer object function
+struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
+
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches/* = 3*/)
   : m_ccnx (ccnx)
   , m_sync (sync)
+  , m_maxParallelFetches (parallelFetches)
+  , m_currentParallelFetches (0)
+
 {
   m_scheduler = make_shared<Scheduler> ();
   m_scheduler->start ();
@@ -38,6 +44,8 @@
 
 FetchManager::~FetchManager ()
 {
+  m_fetchList.clear_and_dispose (fetcher_disposer ());
+
   m_scheduler->shutdown ();
   m_scheduler.reset ();
 }
@@ -45,16 +53,65 @@
 void
 FetchManager::Enqueue (const Ccnx::Name &deviceName, uint32_t minSeqNo, uint32_t maxSeqNo, int priority/*=PRIORITY_NORMAL*/)
 {
+  // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
+  Name forwardingHint = m_sync->LookupLocator (deviceName);
+  Fetcher &fetcher = *(new Fetcher (*this, deviceName, minSeqNo, maxSeqNo, forwardingHint));
+
+  switch (priority)
+    {
+    case PRIORITY_HIGH:
+      m_fetchList.push_front (fetcher);
+      break;
+
+    case PRIORITY_NORMAL:
+    default:
+      m_fetchList.push_back (fetcher);
+      break;
+    }
+
+  ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
 }
 
-Ccnx::CcnxWrapperPtr
-FetchManager::GetCcnx ()
+void
+FetchManager::ScheduleFetches ()
 {
-  return m_ccnx;
+  unique_lock<mutex> lock (m_parellelFetchMutex);
+
+  for (FetchList::iterator item = m_fetchList.begin ();
+       m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end ();
+       item++)
+    {
+      if (item->m_active)
+        continue;
+
+      m_currentParallelFetches ++;
+      item->RestartPipeline ();
+    }
 }
 
-SchedulerPtr
-FetchManager::GetScheduler ()
+void
+FetchManager::NoDataTimeout (Fetcher &fetcher)
 {
-  return m_scheduler;
+  fetcher.m_forwardingHint = Ccnx::Name ("/ndn/broadcast");
+  fetcher.m_active = false;
+  {
+    unique_lock<mutex> lock (m_parellelFetchMutex);
+    m_currentParallelFetches --;
+    // no need to do anything with the m_fetchList
+  }
+  
+  ScheduleFetches ();
+}
+
+void
+FetchManager::FetchComplete (Fetcher &fetcher)
+{
+  fetcher.m_active = false;
+  {
+    unique_lock<mutex> lock (m_parellelFetchMutex);
+    m_currentParallelFetches --;
+    m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+  }
+  
+  // ? do something else
 }
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index eb38551..b200318 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -25,6 +25,7 @@
 #include <boost/exception/all.hpp>
 #include <boost/shared_ptr.hpp>
 #include <string>
+#include <list>
 #include <stdint.h>
 #include "scheduler.h"
 
@@ -32,6 +33,8 @@
 #include "ccnx-tunnel.h"
 #include "sync-log.h"
 
+#include "fetcher.h"
+
 class FetchManager
 {
   enum
@@ -41,29 +44,64 @@
     };
 
 public:
-  FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync);
+  FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches = 3);
   virtual ~FetchManager ();
 
   void
   Enqueue (const Ccnx::Name &deviceName,
            uint32_t minSeqNo, uint32_t maxSeqNo, int priority=PRIORITY_NORMAL);
 
-  Ccnx::CcnxWrapperPtr
+  inline Ccnx::CcnxWrapperPtr
   GetCcnx ();
 
-  SchedulerPtr
+  inline SchedulerPtr
   GetScheduler ();
+
+private:
+  void
+  ScheduleFetches ();  
+  
+  // Events called from Fetcher
+  void
+  NoDataTimeout (Fetcher &fetcher);
+
+  void
+  FetchComplete (Fetcher &fetcher);
+
+private:
   
 private:
   Ccnx::CcnxWrapperPtr m_ccnx;
   SyncLogPtr m_sync; // to access forwarding hints
   SchedulerPtr m_scheduler;
+
+  uint32_t m_maxParallelFetches;
+  uint32_t m_currentParallelFetches;
+  boost::mutex m_parellelFetchMutex;
+
+  // optimized list structure for fetch queue
+  typedef boost::intrusive::member_hook< Fetcher,
+                                         boost::intrusive::list_member_hook<>, &Fetcher::m_managerListHook> MemberOption;
+  typedef boost::intrusive::list<Fetcher, MemberOption> FetchList;
+  
+  FetchList m_fetchList;
 };
 
+Ccnx::CcnxWrapperPtr
+FetchManager::GetCcnx ()
+{
+  return m_ccnx;
+}
+
+SchedulerPtr
+FetchManager::GetScheduler ()
+{
+  return m_scheduler;
+}
 
 typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str; 
 namespace Error {
-struct Fetcher : virtual boost::exception, virtual std::exception { };
+struct FetchManager : virtual boost::exception, virtual std::exception { };
 }
 
 typedef boost::shared_ptr<FetchManager> FetchManagerPtr;
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 1cb07d9..c015407 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -31,6 +31,7 @@
                   const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
                   const Ccnx::Name &forwardingHint/* = Ccnx::Name ()*/)
   : m_fetchManager (fetchManger)
+  , m_active (false)
   , m_name (name)
   , m_forwardingHint (forwardingHint)
   , m_minSendSeqNo (-1)
@@ -45,3 +46,9 @@
 Fetcher::~Fetcher ()
 {
 }
+
+void
+Fetcher::RestartPipeline ()
+{
+  m_active = true;
+}
diff --git a/src/fetcher.h b/src/fetcher.h
index 46faf2e..5cb3977 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -24,10 +24,11 @@
 
 #include "ccnx-wrapper.h"
 #include "scheduler.h"
+#include <boost/intrusive/list.hpp>
 
 class FetchManager;
 
-class Fetcher 
+class Fetcher
 {
 public:
   Fetcher (FetchManager &fetchManger,
@@ -37,6 +38,9 @@
 
 private:
   void
+  RestartPipeline ();
+  
+  void
   OnData ();
 
   void
@@ -44,6 +48,7 @@
   
 private:
   FetchManager &m_fetchManager;
+  bool m_active;
   
   Ccnx::Name m_name;
   Ccnx::Name m_forwardingHint;
@@ -54,6 +59,9 @@
   int32_t m_maxSeqNo;
 
   uint32_t m_pipeline;
+
+  boost::intrusive::list_member_hook<> m_managerListHook;  
+  friend class FetchManager;
 };
 
 typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;