fetch: Switch code to use ndn-cxx

This commit also moves code to ndn::chronoshare namespace and changes
logging to show file and line number of the logging statement.

Change-Id: I075320644166cea9d5d3ef65bb26a2cabfd4dc5a
diff --git a/src/fetch-manager.cpp b/src/fetch-manager.cpp
index 15ecb32..d8dd6a6 100644
--- a/src/fetch-manager.cpp
+++ b/src/fetch-manager.cpp
@@ -19,21 +19,16 @@
  */
 
 #include "fetch-manager.hpp"
-#include <boost/lexical_cast.hpp>
-#include <boost/make_shared.hpp>
-#include <boost/ref.hpp>
-#include <boost/throw_exception.hpp>
+#include "core/logging.hpp"
 
-#include "logging.hpp"
-#include "simple-interval-generator.hpp"
+#include <ndn-cxx/face.hpp>
+
+namespace ndn {
+namespace chronoshare {
 
 _LOG_INIT(FetchManager);
 
-using namespace boost;
-using namespace std;
-using namespace Ndnx;
-
-//The disposer object function
+// The disposer object function
 struct fetcher_disposer
 {
   void
@@ -43,54 +38,42 @@
   }
 };
 
-static const string SCHEDULE_FETCHES_TAG = "ScheduleFetches";
-
-FetchManager::FetchManager(Ccnx::CcnxWrapperPtr ccnx,
-                           const Mapping& mapping,
-                           const Name& broadcastForwardingHint,
+FetchManager::FetchManager(Face& face, const Mapping& mapping, const Name& broadcastForwardingHint,
                            uint32_t parallelFetches, // = 3
                            const SegmentCallback& defaultSegmentCallback,
-                           const FinishCallback& defaultFinishCallback,
-                           const FetchTaskDbPtr& taskDb)
-  : m_ccnx(ccnx)
+                           const FinishCallback& defaultFinishCallback, const FetchTaskDbPtr& taskDb)
+  : m_face(face)
   , m_mapping(mapping)
   , m_maxParallelFetches(parallelFetches)
   , m_currentParallelFetches(0)
-  , m_scheduler(new Scheduler)
-  , m_executor(new Executor(1))
+  , m_scheduler(m_face.getIoService())
+  , m_scheduledFetchesEvent(m_scheduler)
   , m_defaultSegmentCallback(defaultSegmentCallback)
   , m_defaultFinishCallback(defaultFinishCallback)
   , m_taskDb(taskDb)
   , m_broadcastHint(broadcastForwardingHint)
+  , m_ioService(m_face.getIoService())
 {
-  m_scheduler->start();
-  m_executor->start();
+  // no need to check to often. if needed, will be rescheduled
+  m_scheduledFetchesEvent =
+    m_scheduler.scheduleEvent(time::seconds(300), bind(&FetchManager::ScheduleFetches, this));
 
-  m_scheduleFetchesTask =
-    Scheduler::schedulePeriodicTask(m_scheduler,
-                                    make_shared<SimpleIntervalGenerator>(
-                                      300), // no need to check to often. if needed, will be rescheduled
-                                    bind(&FetchManager::ScheduleFetches, this),
-                                    SCHEDULE_FETCHES_TAG);
   // resume un-finished fetches if there is any
   if (m_taskDb) {
-    m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
+    m_taskDb->foreachTask(
+      [this](const Name& deviceName, const Name& baseName, uint64_t minSeqNo, uint64_t maxSeqNo,
+             int priority) { this->Enqueue(deviceName, baseName, minSeqNo, maxSeqNo, priority); });
   }
 }
 
 FetchManager::~FetchManager()
 {
-  m_scheduler->shutdown();
-  m_executor->shutdown();
-
-  m_ccnx.reset();
-
   m_fetchList.clear_and_dispose(fetcher_disposer());
 }
 
 // Enqueue using default callbacks
 void
-FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName, uint64_t minSeqNo,
+FetchManager::Enqueue(const Name& deviceName, const Name& baseName, uint64_t minSeqNo,
                       uint64_t maxSeqNo, int priority)
 {
   Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
@@ -98,7 +81,7 @@
 }
 
 void
-FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName,
+FetchManager::Enqueue(const Name& deviceName, const Name& baseName,
                       const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
                       uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
 {
@@ -115,14 +98,14 @@
     m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
   }
 
-  unique_lock<mutex> lock(m_parellelFetchMutex);
+  std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
 
   _LOG_TRACE("++++ Create fetcher: " << baseName);
   Fetcher* fetcher =
-    new Fetcher(m_ccnx, m_executor, segmentCallback, finishCallback,
+    new Fetcher(m_face, segmentCallback, finishCallback,
                 bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
                 bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
-                maxSeqNo, boost::posix_time::seconds(30), forwardingHint);
+                maxSeqNo, time::seconds(30), forwardingHint);
 
   switch (priority) {
     case PRIORITY_HIGH:
@@ -138,19 +121,17 @@
   }
 
   _LOG_DEBUG("++++ Reschedule fetcher task");
-  m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
-  // ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
+  m_scheduledFetchesEvent =
+    m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
 }
 
 void
 FetchManager::ScheduleFetches()
 {
-  unique_lock<mutex> lock(m_parellelFetchMutex);
+  std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
 
-  boost::posix_time::ptime currentTime =
-    date_time::second_clock<boost::posix_time::ptime>::universal_time();
-  boost::posix_time::ptime nextSheduleCheck =
-    currentTime + posix_time::seconds(300); // no reason to have anything, but just in case
+  auto currentTime = time::steady_clock::now();
+  auto nextSheduleCheck = currentTime + time::seconds(300); // no reason to have anything, but just in case
 
   for (FetchList::iterator item = m_fetchList.begin();
        m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
@@ -166,8 +147,9 @@
     }
 
     if (currentTime < item->GetNextScheduledRetry()) {
-      if (item->GetNextScheduledRetry() < nextSheduleCheck)
+      if (item->GetNextScheduledRetry() < nextSheduleCheck) {
         nextSheduleCheck = item->GetNextScheduledRetry();
+      }
 
       _LOG_DEBUG("Item is delayed");
       continue;
@@ -180,8 +162,8 @@
     item->RestartPipeline();
   }
 
-  m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask,
-                                (nextSheduleCheck - currentTime).total_seconds());
+  m_scheduledFetchesEvent = m_scheduler.scheduleEvent(nextSheduleCheck - currentTime,
+                                                      bind(&FetchManager::ScheduleFetches, this));
 }
 
 void
@@ -191,7 +173,7 @@
                                     << fetcher.GetForwardingHint());
 
   {
-    unique_lock<mutex> lock(m_parellelFetchMutex);
+    std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
     m_currentParallelFetches--;
     // no need to do anything with the m_fetchList
   }
@@ -220,27 +202,26 @@
     fetcher.SetForwardingHint(m_broadcastHint);
   }
 
-  double delay = fetcher.GetRetryPause();
-  if (delay < 1) // first time
+  time::seconds delay = fetcher.GetRetryPause();
+  if (delay < time::seconds(1)) // first time
   {
-    delay = 1;
+    delay = time::seconds(1);
   }
   else {
-    delay = std::min(2 * delay, 300.0); // 5 minutes max
+    delay = std::min(2 * delay, time::seconds(300)); // 5 minutes max
   }
 
   fetcher.SetRetryPause(delay);
-  fetcher.SetNextScheduledRetry(date_time::second_clock<boost::posix_time::ptime>::universal_time() +
-                                posix_time::seconds(delay));
+  fetcher.SetNextScheduledRetry(time::steady_clock::now() + time::seconds(delay));
 
-  m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
+  m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
 }
 
 void
 FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
 {
   {
-    unique_lock<mutex> lock(m_parellelFetchMutex);
+    std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
     m_currentParallelFetches--;
 
     if (m_taskDb) {
@@ -249,17 +230,17 @@
   }
 
   // like TCP timed-wait
-  m_scheduler->scheduleOneTimeTask(m_scheduler, 10,
-                                   boost::bind(&FetchManager::TimedWait, this, ref(fetcher)),
-                                   boost::lexical_cast<string>(baseName));
-
-  m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
+  m_scheduler.scheduleEvent(time::seconds(10), bind(&FetchManager::TimedWait, this, ref(fetcher)));
+  m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
 }
 
 void
 FetchManager::TimedWait(Fetcher& fetcher)
 {
-  unique_lock<mutex> lock(m_parellelFetchMutex);
+  std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
   _LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
   m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
 }
+
+} // namespace chronoshare
+} // namespace ndn