add fetch-task-db and integrate it with fetch-manager
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 5d5d965..713b7c2 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -22,6 +22,7 @@
#include "dispatcher.h"
#include "logging.h"
#include "ccnx-discovery.h"
+#include "fetch-task-db.h"
#include <boost/make_shared.hpp>
#include <boost/lexical_cast.hpp>
@@ -67,12 +68,15 @@
m_core = new SyncCore (m_syncLog, localUserName, Name ("/"), syncPrefix,
bind(&Dispatcher::Did_SyncLog_StateChange, this, _1), ccnx, DEFAULT_SYNC_INTEREST_INTERVAL);
+ FetchTaskDbPtr actionTaskDb = make_shared<FetchTaskDb>(m_rootDir, "action");
m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
- bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4));
+ bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback(), actionTaskDb);
+ FetchTaskDbPtr fileTaskDb = make_shared<FetchTaskDb>(m_rootDir, "file");
m_fileFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
- bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2));
+ bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
+ fileTaskDb);
if (m_enablePrefixDiscovery)
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 8d98207..e01df78 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -44,6 +44,7 @@
, uint32_t parallelFetches // = 3
, const SegmentCallback &defaultSegmentCallback
, const FinishCallback &defaultFinishCallback
+ , const FetchTaskDbPtr &taskDb
)
: m_ccnx (ccnx)
, m_mapping (mapping)
@@ -53,10 +54,17 @@
, m_executor (new Executor(1))
, m_defaultSegmentCallback(defaultSegmentCallback)
, m_defaultFinishCallback(defaultFinishCallback)
+ , m_taskDb(taskDb)
{
m_scheduler->start ();
m_executor->start();
+ // resume un-finished fetches if there is any
+ if (m_taskDb)
+ {
+ m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
+ }
+
m_scheduleFetchesTask = Scheduler::schedulePeriodicTask (m_scheduler,
make_shared<SimpleIntervalGenerator> (300), // no need to check to often. if needed, will be rescheduled
bind (&FetchManager::ScheduleFetches, this), SCHEDULE_FETCHES_TAG);
@@ -94,6 +102,8 @@
Name forwardingHint;
forwardingHint = m_mapping (deviceName);
+ m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+
unique_lock<mutex> lock (m_parellelFetchMutex);
_LOG_TRACE ("++++ Create fetcher: " << baseName);
@@ -101,7 +111,7 @@
m_executor,
segmentCallback,
finishCallback,
- bind (&FetchManager::DidFetchComplete, this, _1),
+ bind (&FetchManager::DidFetchComplete, this, _1, _2, _3),
bind (&FetchManager::DidNoDataTimeout, this, _1),
deviceName, baseName, minSeqNo, maxSeqNo,
boost::posix_time::seconds (30),
@@ -205,13 +215,14 @@
}
void
-FetchManager::DidFetchComplete (Fetcher &fetcher)
+FetchManager::DidFetchComplete (Fetcher &fetcher, const Name &deviceName, const Name &baseName)
{
{
unique_lock<mutex> lock (m_parellelFetchMutex);
m_currentParallelFetches --;
_LOG_TRACE ("+++++ removing fetcher: " << fetcher.GetName ());
m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+ m_taskDb->deleteTask(deviceName, baseName);
}
m_scheduler->rescheduleTaskAt (m_scheduleFetchesTask, 0);
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 8c0ad83..31cb063 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -31,6 +31,7 @@
#include "scheduler.h"
#include "executor.h"
#include "ccnx-wrapper.h"
+#include "fetch-task-db.h"
#include "fetcher.h"
@@ -51,6 +52,7 @@
, uint32_t parallelFetches = 3
, const SegmentCallback &defaultSegmentCallback = SegmentCallback()
, const FinishCallback &defaultFinishCallback = FinishCallback()
+ , const FetchTaskDbPtr &taskDb = FetchTaskDbPtr()
);
virtual ~FetchManager ();
@@ -78,7 +80,7 @@
DidNoDataTimeout (Fetcher &fetcher);
void
- DidFetchComplete (Fetcher &fetcher);
+ DidFetchComplete (Fetcher &fetcher, const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
void
ScheduleFetches ();
@@ -102,6 +104,7 @@
TaskPtr m_scheduleFetchesTask;
SegmentCallback m_defaultSegmentCallback;
FinishCallback m_defaultFinishCallback;
+ FetchTaskDbPtr m_taskDb;
};
Ccnx::CcnxWrapperPtr
diff --git a/src/fetch-task-db.cc b/src/fetch-task-db.cc
new file mode 100644
index 0000000..11ae6c9
--- /dev/null
+++ b/src/fetch-task-db.cc
@@ -0,0 +1,126 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#include "fetch-task-db.h"
+#include "db-helper.h"
+
+using namespace std;
+using namespace boost;
+using namespace Ccnx;
+namespace fs = boost::filesystem;
+
+const string INIT_DATABASE = "\
+CREATE TABLE IF NOT EXISTS \n\
+ Task( \n\
+ deviceName BLOB NOT NULL, \n\
+ baseName BLOB NOT NULL, \n\
+ minSeqNo INTEGER, \n\
+ maxSeqNo INTEGER, \n\
+ priority INTEGER, \n\
+ PRIMARY KEY (deviceName, baseName) \n\
+ ); \n\
+CREATE INDEX identifier ON Task (deviceName, baseName); \n\
+";
+
+FetchTaskDb::FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag)
+{
+ fs::path actualFolder = folder / ".chronoshare" / "fetch_tasks";
+ fs::create_directories (actualFolder);
+
+ int res = sqlite3_open((actualFolder / tag).c_str(), &m_db);
+ if (res != SQLITE_OK)
+ {
+ BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Cannot open database: " + (actualFolder / tag).string()));
+ }
+
+ char *errmsg = 0;
+ res = sqlite3_exec(m_db, INIT_DATABASE.c_str(), NULL, NULL, &errmsg);
+ if (res != SQLITE_OK && errmsg != 0)
+ {
+ // _LOG_TRACE ("Init \"error\": " << errmsg);
+ sqlite3_free (errmsg);
+ }
+ else
+ {
+ }
+}
+
+FetchTaskDb::~FetchTaskDb()
+{
+ int res = sqlite3_close(m_db);
+ if (res != SQLITE_OK)
+ {
+ // _LOG_ERROR
+ }
+}
+
+void
+FetchTaskDb::addTask(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "INSERT OR IGNORE INTO Task (deviceName, baseName, minSeqNo, maxSeqNo, priority) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0);
+ CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+ CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+ sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_int64(stmt, 3, minSeqNo);
+ sqlite3_bind_int64(stmt, 4, maxSeqNo);
+ sqlite3_bind_int(stmt, 5, priority);
+ int res = sqlite3_step(stmt);
+
+ if (res == SQLITE_OK)
+ {
+ }
+ sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::deleteTask(const Name &deviceName, const Name &baseName)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "DELETE FROM Task WHERE deviceName = ? AND baseName = ?;", -1, &stmt, 0);
+ CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+ CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+ sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+ int res = sqlite3_step(stmt);
+ if (res == SQLITE_OK)
+ {
+ }
+ sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::foreachTask(const FetchTaskCallback &callback)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "SELECT * FROM Task;", -1, &stmt, 0);
+ while (sqlite3_step(stmt) == SQLITE_ROW)
+ {
+ Name deviceName(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
+ Name baseName(sqlite3_column_blob(stmt, 1), sqlite3_column_bytes(stmt, 1));
+ uint64_t minSeqNo = sqlite3_column_int64(stmt, 2);
+ uint64_t maxSeqNo = sqlite3_column_int64(stmt, 3);
+ int priority = sqlite3_column_int(stmt, 4);
+ callback(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+ }
+
+ sqlite3_finalize(stmt);
+}
diff --git a/src/fetch-task-db.h b/src/fetch-task-db.h
new file mode 100644
index 0000000..1f8e16a
--- /dev/null
+++ b/src/fetch-task-db.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#ifndef FETCH_TASK_DB_H
+#define FETCH_TASK_DB_H
+
+#include <sqlite3.h>
+#include <ccnx-common.h>
+#include <ccnx-name.h>
+#include <boost/filesystem.hpp>
+#include <boost/shared_ptr.hpp>
+
+class FetchTaskDb
+{
+public:
+ FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag);
+ ~FetchTaskDb();
+
+ // task with same deviceName and baseName combination will be added only once
+ // if task already exists, this call does nothing
+ void
+ addTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority);
+
+ void
+ deleteTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
+
+ typedef boost::function<void(const Ccnx::Name &, const Ccnx::Name &, uint64_t, uint64_t, int)> FetchTaskCallback;
+
+ void
+ foreachTask(const FetchTaskCallback &callback);
+
+private:
+ sqlite3 *m_db;
+};
+
+typedef boost::shared_ptr<FetchTaskDb> FetchTaskDbPtr;
+
+#endif // FETCH_TASK_DB_H
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 13a52b3..2e156a5 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -195,7 +195,7 @@
// tell FetchManager that we have finish our job
// m_onFetchComplete (*this);
// using executor, so we won't be deleted if there is scheduled FillPipeline call
- m_executor->execute (bind (m_onFetchComplete, ref(*this)));
+ m_executor->execute (bind (m_onFetchComplete, ref(*this), m_deviceName, m_name));
}
else
{
diff --git a/src/fetcher.h b/src/fetcher.h
index 8e6b441..f8b9df5 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -36,7 +36,7 @@
public:
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
- typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
+ typedef boost::function<void (Fetcher &, const Ccnx::Name &deviceName, const Ccnx::Name &baseName)> OnFetchCompleteCallback;
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
Fetcher (Ccnx::CcnxWrapperPtr ccnx,