Merge remote-tracking branch 'git.irl/master'
diff --git a/scheduler/task.cc b/scheduler/task.cc
index 4702906..2f72f06 100644
--- a/scheduler/task.cc
+++ b/scheduler/task.cc
@@ -72,5 +72,12 @@
void
Task::execute()
{
- m_scheduler->execute(boost::bind(&Task::run, this));
+ // m_scheduler->execute(boost::bind(&Task::run, this));
+
+ // using a shared_ptr of this to ensure that when invoked from executor
+ // the task object still exists
+ // otherwise, it could be the case that the run() is to be executed, but before it
+ // could finish, the TaskPtr gets deleted from scheduler and the task object
+ // gets destroyed, causing crash
+ m_scheduler->execute(boost::bind(&Task::run, shared_from_this()));
}
diff --git a/scheduler/task.h b/scheduler/task.h
index 41e4438..11a9fa6 100644
--- a/scheduler/task.h
+++ b/scheduler/task.h
@@ -32,6 +32,7 @@
#include <boost/function.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/enable_shared_from_this.hpp>
#include <sys/time.h>
//////////////////////////////////////////////////
@@ -49,7 +50,7 @@
/**
* @brief Base class for a task
*/
-class Task
+class Task : public boost::enable_shared_from_this<Task>
{
public:
// callback of this task
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index cc677b6..93b9336 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -67,8 +67,13 @@
m_core = new SyncCore (m_syncLog, localUserName, Name ("/"), syncPrefix,
bind(&Dispatcher::Did_SyncLog_StateChange, this, _1), ccnx, DEFAULT_SYNC_INTEREST_INTERVAL);
- m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3);
- m_fileFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3);
+ m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
+ bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4));
+
+ m_fileFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
+ bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
+ bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2));
+
if (m_enablePrefixDiscovery)
{
@@ -252,7 +257,6 @@
Name actionNameBase = Name(userName)("action")(m_sharedFolder);
m_actionFetcher->Enqueue (userName, actionNameBase,
- bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback (),
std::max<uint64_t> (oldSeq + 1, 1), newSeq, FetchManager::PRIORITY_HIGH);
}
}
@@ -289,8 +293,6 @@
}
m_fileFetcher->Enqueue (deviceName, fileNameBase,
- bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
- bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
0, action->seg_num () - 1, FetchManager::PRIORITY_NORMAL);
}
}
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 788d635..8d98207 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -39,13 +39,20 @@
static const string SCHEDULE_FETCHES_TAG = "ScheduleFetches";
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, const Mapping &mapping, uint32_t parallelFetches/* = 3*/)
+FetchManager::FetchManager (Ccnx::CcnxWrapperPtr ccnx
+ , const Mapping &mapping
+ , uint32_t parallelFetches // = 3
+ , const SegmentCallback &defaultSegmentCallback
+ , const FinishCallback &defaultFinishCallback
+ )
: m_ccnx (ccnx)
, m_mapping (mapping)
, m_maxParallelFetches (parallelFetches)
, m_currentParallelFetches (0)
, m_scheduler (new Scheduler)
, m_executor (new Executor(1))
+ , m_defaultSegmentCallback(defaultSegmentCallback)
+ , m_defaultFinishCallback(defaultFinishCallback)
{
m_scheduler->start ();
m_executor->start();
@@ -64,6 +71,14 @@
m_fetchList.clear_and_dispose (fetcher_disposer ());
}
+// Enqueue using default callbacks
+void
+FetchManager::Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+ Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo, maxSeqNo, priority);
+}
+
void
FetchManager::Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 793c5c6..8c0ad83 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -46,7 +46,12 @@
typedef boost::function<Ccnx::Name(const Ccnx::Name &)> Mapping;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
- FetchManager (Ccnx::CcnxWrapperPtr ccnx, const Mapping &mapping, uint32_t parallelFetches = 3);
+ FetchManager (Ccnx::CcnxWrapperPtr ccnx
+ , const Mapping &mapping
+ , uint32_t parallelFetches = 3
+ , const SegmentCallback &defaultSegmentCallback = SegmentCallback()
+ , const FinishCallback &defaultFinishCallback = FinishCallback()
+ );
virtual ~FetchManager ();
void
@@ -54,6 +59,11 @@
const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
uint64_t minSeqNo, uint64_t maxSeqNo, int priority=PRIORITY_NORMAL);
+ // Enqueue using default callbacks
+ void
+ Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority=PRIORITY_NORMAL);
+
// only for Fetcher
inline Ccnx::CcnxWrapperPtr
GetCcnx ();
@@ -90,6 +100,8 @@
SchedulerPtr m_scheduler;
ExecutorPtr m_executor;
TaskPtr m_scheduleFetchesTask;
+ SegmentCallback m_defaultSegmentCallback;
+ FinishCallback m_defaultFinishCallback;
};
Ccnx::CcnxWrapperPtr