Merge branch 'master' of git.irl.cs.ucla.edu:ndn/chronoshare
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 93b9336..63b59cb 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -22,6 +22,7 @@
#include "dispatcher.h"
#include "logging.h"
#include "ccnx-discovery.h"
+#include "fetch-task-db.h"
#include <boost/make_shared.hpp>
#include <boost/lexical_cast.hpp>
@@ -67,12 +68,15 @@
m_core = new SyncCore (m_syncLog, localUserName, Name ("/"), syncPrefix,
bind(&Dispatcher::Did_SyncLog_StateChange, this, _1), ccnx, DEFAULT_SYNC_INTEREST_INTERVAL);
+ FetchTaskDbPtr actionTaskDb = make_shared<FetchTaskDb>(m_rootDir, "action");
m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
- bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4));
+ bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback(), actionTaskDb);
+ FetchTaskDbPtr fileTaskDb = make_shared<FetchTaskDb>(m_rootDir, "file");
m_fileFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
- bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2));
+ bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
+ fileTaskDb);
if (m_enablePrefixDiscovery)
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 8d98207..e01df78 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -44,6 +44,7 @@
, uint32_t parallelFetches // = 3
, const SegmentCallback &defaultSegmentCallback
, const FinishCallback &defaultFinishCallback
+ , const FetchTaskDbPtr &taskDb
)
: m_ccnx (ccnx)
, m_mapping (mapping)
@@ -53,10 +54,17 @@
, m_executor (new Executor(1))
, m_defaultSegmentCallback(defaultSegmentCallback)
, m_defaultFinishCallback(defaultFinishCallback)
+ , m_taskDb(taskDb)
{
m_scheduler->start ();
m_executor->start();
+ // resume un-finished fetches if there is any
+ if (m_taskDb)
+ {
+ m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
+ }
+
m_scheduleFetchesTask = Scheduler::schedulePeriodicTask (m_scheduler,
make_shared<SimpleIntervalGenerator> (300), // no need to check to often. if needed, will be rescheduled
bind (&FetchManager::ScheduleFetches, this), SCHEDULE_FETCHES_TAG);
@@ -94,6 +102,8 @@
Name forwardingHint;
forwardingHint = m_mapping (deviceName);
+ m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+
unique_lock<mutex> lock (m_parellelFetchMutex);
_LOG_TRACE ("++++ Create fetcher: " << baseName);
@@ -101,7 +111,7 @@
m_executor,
segmentCallback,
finishCallback,
- bind (&FetchManager::DidFetchComplete, this, _1),
+ bind (&FetchManager::DidFetchComplete, this, _1, _2, _3),
bind (&FetchManager::DidNoDataTimeout, this, _1),
deviceName, baseName, minSeqNo, maxSeqNo,
boost::posix_time::seconds (30),
@@ -205,13 +215,14 @@
}
void
-FetchManager::DidFetchComplete (Fetcher &fetcher)
+FetchManager::DidFetchComplete (Fetcher &fetcher, const Name &deviceName, const Name &baseName)
{
{
unique_lock<mutex> lock (m_parellelFetchMutex);
m_currentParallelFetches --;
_LOG_TRACE ("+++++ removing fetcher: " << fetcher.GetName ());
m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+ m_taskDb->deleteTask(deviceName, baseName);
}
m_scheduler->rescheduleTaskAt (m_scheduleFetchesTask, 0);
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 8c0ad83..31cb063 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -31,6 +31,7 @@
#include "scheduler.h"
#include "executor.h"
#include "ccnx-wrapper.h"
+#include "fetch-task-db.h"
#include "fetcher.h"
@@ -51,6 +52,7 @@
, uint32_t parallelFetches = 3
, const SegmentCallback &defaultSegmentCallback = SegmentCallback()
, const FinishCallback &defaultFinishCallback = FinishCallback()
+ , const FetchTaskDbPtr &taskDb = FetchTaskDbPtr()
);
virtual ~FetchManager ();
@@ -78,7 +80,7 @@
DidNoDataTimeout (Fetcher &fetcher);
void
- DidFetchComplete (Fetcher &fetcher);
+ DidFetchComplete (Fetcher &fetcher, const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
void
ScheduleFetches ();
@@ -102,6 +104,7 @@
TaskPtr m_scheduleFetchesTask;
SegmentCallback m_defaultSegmentCallback;
FinishCallback m_defaultFinishCallback;
+ FetchTaskDbPtr m_taskDb;
};
Ccnx::CcnxWrapperPtr
diff --git a/src/fetch-task-db.cc b/src/fetch-task-db.cc
new file mode 100644
index 0000000..11ae6c9
--- /dev/null
+++ b/src/fetch-task-db.cc
@@ -0,0 +1,126 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#include "fetch-task-db.h"
+#include "db-helper.h"
+
+using namespace std;
+using namespace boost;
+using namespace Ccnx;
+namespace fs = boost::filesystem;
+
+const string INIT_DATABASE = "\
+CREATE TABLE IF NOT EXISTS \n\
+ Task( \n\
+ deviceName BLOB NOT NULL, \n\
+ baseName BLOB NOT NULL, \n\
+ minSeqNo INTEGER, \n\
+ maxSeqNo INTEGER, \n\
+ priority INTEGER, \n\
+ PRIMARY KEY (deviceName, baseName) \n\
+ ); \n\
+CREATE INDEX identifier ON Task (deviceName, baseName); \n\
+";
+
+FetchTaskDb::FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag)
+{
+ fs::path actualFolder = folder / ".chronoshare" / "fetch_tasks";
+ fs::create_directories (actualFolder);
+
+ int res = sqlite3_open((actualFolder / tag).c_str(), &m_db);
+ if (res != SQLITE_OK)
+ {
+ BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Cannot open database: " + (actualFolder / tag).string()));
+ }
+
+ char *errmsg = 0;
+ res = sqlite3_exec(m_db, INIT_DATABASE.c_str(), NULL, NULL, &errmsg);
+ if (res != SQLITE_OK && errmsg != 0)
+ {
+ // _LOG_TRACE ("Init \"error\": " << errmsg);
+ sqlite3_free (errmsg);
+ }
+ else
+ {
+ }
+}
+
+FetchTaskDb::~FetchTaskDb()
+{
+ int res = sqlite3_close(m_db);
+ if (res != SQLITE_OK)
+ {
+ // _LOG_ERROR
+ }
+}
+
+void
+FetchTaskDb::addTask(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "INSERT OR IGNORE INTO Task (deviceName, baseName, minSeqNo, maxSeqNo, priority) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0);
+ CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+ CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+ sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_int64(stmt, 3, minSeqNo);
+ sqlite3_bind_int64(stmt, 4, maxSeqNo);
+ sqlite3_bind_int(stmt, 5, priority);
+ int res = sqlite3_step(stmt);
+
+ if (res == SQLITE_OK)
+ {
+ }
+ sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::deleteTask(const Name &deviceName, const Name &baseName)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "DELETE FROM Task WHERE deviceName = ? AND baseName = ?;", -1, &stmt, 0);
+ CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+ CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+ sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+ int res = sqlite3_step(stmt);
+ if (res == SQLITE_OK)
+ {
+ }
+ sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::foreachTask(const FetchTaskCallback &callback)
+{
+ sqlite3_stmt *stmt;
+ sqlite3_prepare_v2(m_db, "SELECT * FROM Task;", -1, &stmt, 0);
+ while (sqlite3_step(stmt) == SQLITE_ROW)
+ {
+ Name deviceName(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
+ Name baseName(sqlite3_column_blob(stmt, 1), sqlite3_column_bytes(stmt, 1));
+ uint64_t minSeqNo = sqlite3_column_int64(stmt, 2);
+ uint64_t maxSeqNo = sqlite3_column_int64(stmt, 3);
+ int priority = sqlite3_column_int(stmt, 4);
+ callback(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+ }
+
+ sqlite3_finalize(stmt);
+}
diff --git a/src/fetch-task-db.h b/src/fetch-task-db.h
new file mode 100644
index 0000000..1f8e16a
--- /dev/null
+++ b/src/fetch-task-db.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#ifndef FETCH_TASK_DB_H
+#define FETCH_TASK_DB_H
+
+#include <sqlite3.h>
+#include <ccnx-common.h>
+#include <ccnx-name.h>
+#include <boost/filesystem.hpp>
+#include <boost/shared_ptr.hpp>
+
+class FetchTaskDb
+{
+public:
+ FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag);
+ ~FetchTaskDb();
+
+ // task with same deviceName and baseName combination will be added only once
+ // if task already exists, this call does nothing
+ void
+ addTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority);
+
+ void
+ deleteTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
+
+ typedef boost::function<void(const Ccnx::Name &, const Ccnx::Name &, uint64_t, uint64_t, int)> FetchTaskCallback;
+
+ void
+ foreachTask(const FetchTaskCallback &callback);
+
+private:
+ sqlite3 *m_db;
+};
+
+typedef boost::shared_ptr<FetchTaskDb> FetchTaskDbPtr;
+
+#endif // FETCH_TASK_DB_H
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 98b8ceb..044c5ca 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -199,7 +199,7 @@
// tell FetchManager that we have finish our job
// m_onFetchComplete (*this);
// using executor, so we won't be deleted if there is scheduled FillPipeline call
- m_executor->execute (bind (m_onFetchComplete, ref(*this)));
+ m_executor->execute (bind (m_onFetchComplete, ref(*this), m_deviceName, m_name));
}
else
{
diff --git a/src/fetcher.h b/src/fetcher.h
index fa2ce7d..41a2301 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -36,7 +36,7 @@
public:
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
- typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
+ typedef boost::function<void (Fetcher &, const Ccnx::Name &deviceName, const Ccnx::Name &baseName)> OnFetchCompleteCallback;
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
Fetcher (Ccnx::CcnxWrapperPtr ccnx,
diff --git a/test/test-fetch-task-db.cc b/test/test-fetch-task-db.cc
new file mode 100644
index 0000000..1e647af
--- /dev/null
+++ b/test/test-fetch-task-db.cc
@@ -0,0 +1,165 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ * Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ */
+
+#include "logging.h"
+#include "fetch-task-db.h"
+
+#include <boost/filesystem.hpp>
+#include <boost/filesystem/fstream.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/test/unit_test.hpp>
+#include <unistd.h>
+#include <boost/make_shared.hpp>
+#include <iostream>
+#include <iterator>
+#include <map>
+#include <utility>
+
+INIT_LOGGER ("Test.FetchTaskDb");
+
+using namespace Ccnx;
+using namespace std;
+using namespace boost;
+namespace fs = boost::filesystem;
+
+BOOST_AUTO_TEST_SUITE(TestFetchTaskDb)
+
+class Checker
+{
+public:
+ Checker(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+ : m_deviceName(deviceName), m_baseName(baseName), m_minSeqNo(minSeqNo), m_maxSeqNo(maxSeqNo), m_priority(priority)
+ {}
+
+ Checker(const Checker &other)
+ : m_deviceName(other.m_deviceName), m_baseName(other.m_baseName), m_minSeqNo(other.m_minSeqNo), m_maxSeqNo(other.m_maxSeqNo), m_priority(other.m_priority)
+ {}
+
+ bool
+ operator==(const Checker &other) { return m_deviceName == other.m_deviceName && m_baseName == other.m_baseName && m_minSeqNo == other.m_minSeqNo && m_maxSeqNo == other.m_maxSeqNo && m_priority == other.m_priority; }
+
+ void show()
+ {
+ cout << m_deviceName <<", " << m_baseName << ", " << m_minSeqNo << ", " << m_maxSeqNo << ", " << m_priority << endl;
+ }
+
+ Name m_deviceName;
+ Name m_baseName;
+ uint64_t m_minSeqNo;
+ uint64_t m_maxSeqNo;
+ int m_priority;
+};
+
+map<Name, Checker> checkers;
+int g_counter = 0;
+
+void
+getChecker(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+ Checker checker(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+ g_counter ++;
+ if (checkers.find(checker.m_deviceName + checker.m_baseName) != checkers.end())
+ {
+ BOOST_FAIL("duplicated checkers");
+ }
+ checkers.insert(make_pair(checker.m_deviceName + checker.m_baseName, checker));
+}
+
+BOOST_AUTO_TEST_CASE (FetchTaskDbTest)
+{
+ INIT_LOGGERS ();
+ fs::path folder("TaskDbTest");
+ fs::create_directories(folder / ".chronoshare");
+
+ FetchTaskDbPtr db = make_shared<FetchTaskDb>(folder, "test");
+
+ map<Name, Checker> m1;
+ g_counter = 0;
+
+ checkers.clear();
+
+ Name deviceNamePrefix("/device");
+ Name baseNamePrefix("/device/base");
+
+ // add 10 tasks
+ for (uint64_t i = 0; i < 10; i++)
+ {
+ Name d = deviceNamePrefix;
+ Name b = baseNamePrefix;
+ Checker c(d.appendComp(i), b.appendComp(i), i, 11, 1);
+ m1.insert(make_pair(d + b, c));
+ db->addTask(c.m_deviceName, c.m_baseName, c.m_minSeqNo, c.m_maxSeqNo, c.m_priority);
+ }
+
+ // delete the latter 5
+ for (uint64_t i = 5; i < 10; i++)
+ {
+ Name d = deviceNamePrefix;
+ Name b = baseNamePrefix;
+ d.appendComp(i);
+ b.appendComp(i);
+ db->deleteTask(d, b);
+ }
+
+ // add back 3 to 7, 3 and 4 should not be added twice
+
+ for (uint64_t i = 3; i < 8; i++)
+ {
+ Name d = deviceNamePrefix;
+ Name b = baseNamePrefix;
+ Checker c(d.appendComp(i), b.appendComp(i), i, 11, 1);
+ db->addTask(c.m_deviceName, c.m_baseName, c.m_minSeqNo, c.m_maxSeqNo, c.m_priority);
+ }
+
+ db->foreachTask(bind(getChecker, _1, _2, _3, _4, _5));
+
+ BOOST_CHECK_EQUAL(g_counter, 8);
+
+ map<Name, Checker>::iterator it = checkers.begin();
+ while (it != checkers.end())
+ {
+ map<Name, Checker>::iterator mt = m1.find(it->first);
+ if (mt == m1.end())
+ {
+ BOOST_FAIL("unknown task found");
+ }
+ else
+ {
+ Checker c1 = it->second;
+ Checker c2 = mt->second;
+ BOOST_CHECK(c1 == c2);
+ if (! (c1 == c2))
+ {
+ cout << "C1: " << endl;
+ c1.show();
+ cout << "C2: " << endl;
+ c2.show();
+ }
+ }
+ ++it;
+ }
+ fs::remove_all(folder);
+}
+
+
+BOOST_AUTO_TEST_SUITE_END()