blob: 15ecb322062846cd151f7703e0fe2f978bba40e2 [file] [log] [blame]
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
* Copyright (c) 2013-2017, Regents of the University of California.
*
* This file is part of ChronoShare, a decentralized file sharing application over NDN.
*
* ChronoShare is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Foundation, either
* version 3 of the License, or (at your option) any later version.
*
* ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received copies of the GNU General Public License along with
* ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
*
* See AUTHORS.md for complete list of ChronoShare authors and contributors.
*/
#include "fetch-manager.hpp"
#include <boost/lexical_cast.hpp>
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
#include <boost/throw_exception.hpp>
#include "logging.hpp"
#include "simple-interval-generator.hpp"
_LOG_INIT(FetchManager);
using namespace boost;
using namespace std;
using namespace Ndnx;
//The disposer object function
struct fetcher_disposer
{
void
operator()(Fetcher* delete_this)
{
delete delete_this;
}
};
static const string SCHEDULE_FETCHES_TAG = "ScheduleFetches";
FetchManager::FetchManager(Ccnx::CcnxWrapperPtr ccnx,
const Mapping& mapping,
const Name& broadcastForwardingHint,
uint32_t parallelFetches, // = 3
const SegmentCallback& defaultSegmentCallback,
const FinishCallback& defaultFinishCallback,
const FetchTaskDbPtr& taskDb)
: m_ccnx(ccnx)
, m_mapping(mapping)
, m_maxParallelFetches(parallelFetches)
, m_currentParallelFetches(0)
, m_scheduler(new Scheduler)
, m_executor(new Executor(1))
, m_defaultSegmentCallback(defaultSegmentCallback)
, m_defaultFinishCallback(defaultFinishCallback)
, m_taskDb(taskDb)
, m_broadcastHint(broadcastForwardingHint)
{
m_scheduler->start();
m_executor->start();
m_scheduleFetchesTask =
Scheduler::schedulePeriodicTask(m_scheduler,
make_shared<SimpleIntervalGenerator>(
300), // no need to check to often. if needed, will be rescheduled
bind(&FetchManager::ScheduleFetches, this),
SCHEDULE_FETCHES_TAG);
// resume un-finished fetches if there is any
if (m_taskDb) {
m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
}
}
FetchManager::~FetchManager()
{
m_scheduler->shutdown();
m_executor->shutdown();
m_ccnx.reset();
m_fetchList.clear_and_dispose(fetcher_disposer());
}
// Enqueue using default callbacks
void
FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName, uint64_t minSeqNo,
uint64_t maxSeqNo, int priority)
{
Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
maxSeqNo, priority);
}
void
FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName,
const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
{
// Assumption for the following code is minSeqNo <= maxSeqNo
if (minSeqNo > maxSeqNo) {
return;
}
// we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Name forwardingHint;
forwardingHint = m_mapping(deviceName);
if (m_taskDb) {
m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
}
unique_lock<mutex> lock(m_parellelFetchMutex);
_LOG_TRACE("++++ Create fetcher: " << baseName);
Fetcher* fetcher =
new Fetcher(m_ccnx, m_executor, segmentCallback, finishCallback,
bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
maxSeqNo, boost::posix_time::seconds(30), forwardingHint);
switch (priority) {
case PRIORITY_HIGH:
_LOG_TRACE("++++ Push front fetcher: " << fetcher->GetName());
m_fetchList.push_front(*fetcher);
break;
case PRIORITY_NORMAL:
default:
_LOG_TRACE("++++ Push back fetcher: " << fetcher->GetName());
m_fetchList.push_back(*fetcher);
break;
}
_LOG_DEBUG("++++ Reschedule fetcher task");
m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
// ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
}
void
FetchManager::ScheduleFetches()
{
unique_lock<mutex> lock(m_parellelFetchMutex);
boost::posix_time::ptime currentTime =
date_time::second_clock<boost::posix_time::ptime>::universal_time();
boost::posix_time::ptime nextSheduleCheck =
currentTime + posix_time::seconds(300); // no reason to have anything, but just in case
for (FetchList::iterator item = m_fetchList.begin();
m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
item++) {
if (item->IsActive()) {
_LOG_DEBUG("Item is active");
continue;
}
if (item->IsTimedWait()) {
_LOG_DEBUG("Item is in timed-wait");
continue;
}
if (currentTime < item->GetNextScheduledRetry()) {
if (item->GetNextScheduledRetry() < nextSheduleCheck)
nextSheduleCheck = item->GetNextScheduledRetry();
_LOG_DEBUG("Item is delayed");
continue;
}
_LOG_DEBUG("Start fetching of " << item->GetName());
m_currentParallelFetches++;
_LOG_TRACE("++++ RESTART PIPELINE: " << item->GetName());
item->RestartPipeline();
}
m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask,
(nextSheduleCheck - currentTime).total_seconds());
}
void
FetchManager::DidNoDataTimeout(Fetcher& fetcher)
{
_LOG_DEBUG("No data timeout for " << fetcher.GetName() << " with forwarding hint: "
<< fetcher.GetForwardingHint());
{
unique_lock<mutex> lock(m_parellelFetchMutex);
m_currentParallelFetches--;
// no need to do anything with the m_fetchList
}
if (fetcher.GetForwardingHint().size() == 0) {
// will be tried initially and again after empty forwarding hint
/// @todo Handle potential exception
Name forwardingHint;
forwardingHint = m_mapping(fetcher.GetDeviceName());
if (forwardingHint.size() == 0) {
// make sure that we will try broadcast forwarding hint eventually
fetcher.SetForwardingHint(m_broadcastHint);
}
else {
fetcher.SetForwardingHint(forwardingHint);
}
}
else if (fetcher.GetForwardingHint() == m_broadcastHint) {
// will be tried after broadcast forwarding hint
fetcher.SetForwardingHint(Name("/"));
}
else {
// will be tried after normal forwarding hint
fetcher.SetForwardingHint(m_broadcastHint);
}
double delay = fetcher.GetRetryPause();
if (delay < 1) // first time
{
delay = 1;
}
else {
delay = std::min(2 * delay, 300.0); // 5 minutes max
}
fetcher.SetRetryPause(delay);
fetcher.SetNextScheduledRetry(date_time::second_clock<boost::posix_time::ptime>::universal_time() +
posix_time::seconds(delay));
m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
}
void
FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
{
{
unique_lock<mutex> lock(m_parellelFetchMutex);
m_currentParallelFetches--;
if (m_taskDb) {
m_taskDb->deleteTask(deviceName, baseName);
}
}
// like TCP timed-wait
m_scheduler->scheduleOneTimeTask(m_scheduler, 10,
boost::bind(&FetchManager::TimedWait, this, ref(fetcher)),
boost::lexical_cast<string>(baseName));
m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
}
void
FetchManager::TimedWait(Fetcher& fetcher)
{
unique_lock<mutex> lock(m_parellelFetchMutex);
_LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
}