fetch: Switch code to use ndn-cxx
This commit also moves code to ndn::chronoshare namespace and changes
logging to show file and line number of the logging statement.
Change-Id: I075320644166cea9d5d3ef65bb26a2cabfd4dc5a
diff --git a/src/fetch-manager.cpp b/src/fetch-manager.cpp
index 15ecb32..d8dd6a6 100644
--- a/src/fetch-manager.cpp
+++ b/src/fetch-manager.cpp
@@ -19,21 +19,16 @@
*/
#include "fetch-manager.hpp"
-#include <boost/lexical_cast.hpp>
-#include <boost/make_shared.hpp>
-#include <boost/ref.hpp>
-#include <boost/throw_exception.hpp>
+#include "core/logging.hpp"
-#include "logging.hpp"
-#include "simple-interval-generator.hpp"
+#include <ndn-cxx/face.hpp>
+
+namespace ndn {
+namespace chronoshare {
_LOG_INIT(FetchManager);
-using namespace boost;
-using namespace std;
-using namespace Ndnx;
-
-//The disposer object function
+// The disposer object function
struct fetcher_disposer
{
void
@@ -43,54 +38,42 @@
}
};
-static const string SCHEDULE_FETCHES_TAG = "ScheduleFetches";
-
-FetchManager::FetchManager(Ccnx::CcnxWrapperPtr ccnx,
- const Mapping& mapping,
- const Name& broadcastForwardingHint,
+FetchManager::FetchManager(Face& face, const Mapping& mapping, const Name& broadcastForwardingHint,
uint32_t parallelFetches, // = 3
const SegmentCallback& defaultSegmentCallback,
- const FinishCallback& defaultFinishCallback,
- const FetchTaskDbPtr& taskDb)
- : m_ccnx(ccnx)
+ const FinishCallback& defaultFinishCallback, const FetchTaskDbPtr& taskDb)
+ : m_face(face)
, m_mapping(mapping)
, m_maxParallelFetches(parallelFetches)
, m_currentParallelFetches(0)
- , m_scheduler(new Scheduler)
- , m_executor(new Executor(1))
+ , m_scheduler(m_face.getIoService())
+ , m_scheduledFetchesEvent(m_scheduler)
, m_defaultSegmentCallback(defaultSegmentCallback)
, m_defaultFinishCallback(defaultFinishCallback)
, m_taskDb(taskDb)
, m_broadcastHint(broadcastForwardingHint)
+ , m_ioService(m_face.getIoService())
{
- m_scheduler->start();
- m_executor->start();
+ // no need to check to often. if needed, will be rescheduled
+ m_scheduledFetchesEvent =
+ m_scheduler.scheduleEvent(time::seconds(300), bind(&FetchManager::ScheduleFetches, this));
- m_scheduleFetchesTask =
- Scheduler::schedulePeriodicTask(m_scheduler,
- make_shared<SimpleIntervalGenerator>(
- 300), // no need to check to often. if needed, will be rescheduled
- bind(&FetchManager::ScheduleFetches, this),
- SCHEDULE_FETCHES_TAG);
// resume un-finished fetches if there is any
if (m_taskDb) {
- m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
+ m_taskDb->foreachTask(
+ [this](const Name& deviceName, const Name& baseName, uint64_t minSeqNo, uint64_t maxSeqNo,
+ int priority) { this->Enqueue(deviceName, baseName, minSeqNo, maxSeqNo, priority); });
}
}
FetchManager::~FetchManager()
{
- m_scheduler->shutdown();
- m_executor->shutdown();
-
- m_ccnx.reset();
-
m_fetchList.clear_and_dispose(fetcher_disposer());
}
// Enqueue using default callbacks
void
-FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName, uint64_t minSeqNo,
+FetchManager::Enqueue(const Name& deviceName, const Name& baseName, uint64_t minSeqNo,
uint64_t maxSeqNo, int priority)
{
Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
@@ -98,7 +81,7 @@
}
void
-FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName,
+FetchManager::Enqueue(const Name& deviceName, const Name& baseName,
const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
{
@@ -115,14 +98,14 @@
m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
}
- unique_lock<mutex> lock(m_parellelFetchMutex);
+ std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
_LOG_TRACE("++++ Create fetcher: " << baseName);
Fetcher* fetcher =
- new Fetcher(m_ccnx, m_executor, segmentCallback, finishCallback,
+ new Fetcher(m_face, segmentCallback, finishCallback,
bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
- maxSeqNo, boost::posix_time::seconds(30), forwardingHint);
+ maxSeqNo, time::seconds(30), forwardingHint);
switch (priority) {
case PRIORITY_HIGH:
@@ -138,19 +121,17 @@
}
_LOG_DEBUG("++++ Reschedule fetcher task");
- m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
- // ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
+ m_scheduledFetchesEvent =
+ m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
}
void
FetchManager::ScheduleFetches()
{
- unique_lock<mutex> lock(m_parellelFetchMutex);
+ std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
- boost::posix_time::ptime currentTime =
- date_time::second_clock<boost::posix_time::ptime>::universal_time();
- boost::posix_time::ptime nextSheduleCheck =
- currentTime + posix_time::seconds(300); // no reason to have anything, but just in case
+ auto currentTime = time::steady_clock::now();
+ auto nextSheduleCheck = currentTime + time::seconds(300); // no reason to have anything, but just in case
for (FetchList::iterator item = m_fetchList.begin();
m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
@@ -166,8 +147,9 @@
}
if (currentTime < item->GetNextScheduledRetry()) {
- if (item->GetNextScheduledRetry() < nextSheduleCheck)
+ if (item->GetNextScheduledRetry() < nextSheduleCheck) {
nextSheduleCheck = item->GetNextScheduledRetry();
+ }
_LOG_DEBUG("Item is delayed");
continue;
@@ -180,8 +162,8 @@
item->RestartPipeline();
}
- m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask,
- (nextSheduleCheck - currentTime).total_seconds());
+ m_scheduledFetchesEvent = m_scheduler.scheduleEvent(nextSheduleCheck - currentTime,
+ bind(&FetchManager::ScheduleFetches, this));
}
void
@@ -191,7 +173,7 @@
<< fetcher.GetForwardingHint());
{
- unique_lock<mutex> lock(m_parellelFetchMutex);
+ std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
m_currentParallelFetches--;
// no need to do anything with the m_fetchList
}
@@ -220,27 +202,26 @@
fetcher.SetForwardingHint(m_broadcastHint);
}
- double delay = fetcher.GetRetryPause();
- if (delay < 1) // first time
+ time::seconds delay = fetcher.GetRetryPause();
+ if (delay < time::seconds(1)) // first time
{
- delay = 1;
+ delay = time::seconds(1);
}
else {
- delay = std::min(2 * delay, 300.0); // 5 minutes max
+ delay = std::min(2 * delay, time::seconds(300)); // 5 minutes max
}
fetcher.SetRetryPause(delay);
- fetcher.SetNextScheduledRetry(date_time::second_clock<boost::posix_time::ptime>::universal_time() +
- posix_time::seconds(delay));
+ fetcher.SetNextScheduledRetry(time::steady_clock::now() + time::seconds(delay));
- m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
+ m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
}
void
FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
{
{
- unique_lock<mutex> lock(m_parellelFetchMutex);
+ std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
m_currentParallelFetches--;
if (m_taskDb) {
@@ -249,17 +230,17 @@
}
// like TCP timed-wait
- m_scheduler->scheduleOneTimeTask(m_scheduler, 10,
- boost::bind(&FetchManager::TimedWait, this, ref(fetcher)),
- boost::lexical_cast<string>(baseName));
-
- m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
+ m_scheduler.scheduleEvent(time::seconds(10), bind(&FetchManager::TimedWait, this, ref(fetcher)));
+ m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
}
void
FetchManager::TimedWait(Fetcher& fetcher)
{
- unique_lock<mutex> lock(m_parellelFetchMutex);
+ std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
_LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
}
+
+} // namespace chronoshare
+} // namespace ndn