Merge branch 'master' of git.irl.cs.ucla.edu:ndn/chronoshare
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 93b9336..63b59cb 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -22,6 +22,7 @@
 #include "dispatcher.h"
 #include "logging.h"
 #include "ccnx-discovery.h"
+#include "fetch-task-db.h"
 
 #include <boost/make_shared.hpp>
 #include <boost/lexical_cast.hpp>
@@ -67,12 +68,15 @@
   m_core = new SyncCore (m_syncLog, localUserName, Name ("/"), syncPrefix,
                          bind(&Dispatcher::Did_SyncLog_StateChange, this, _1), ccnx, DEFAULT_SYNC_INTEREST_INTERVAL);
 
+  FetchTaskDbPtr actionTaskDb = make_shared<FetchTaskDb>(m_rootDir, "action");
   m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
-                                bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4));
+                                bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback(), actionTaskDb);
 
+  FetchTaskDbPtr fileTaskDb = make_shared<FetchTaskDb>(m_rootDir, "file");
   m_fileFetcher   = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3,
                                   bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
-                                  bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2));
+                                  bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
+                                  fileTaskDb);
 
 
   if (m_enablePrefixDiscovery)
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 8d98207..e01df78 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -44,6 +44,7 @@
                 , uint32_t parallelFetches // = 3
                 , const SegmentCallback &defaultSegmentCallback
                 , const FinishCallback &defaultFinishCallback
+                , const FetchTaskDbPtr &taskDb
                 )
   : m_ccnx (ccnx)
   , m_mapping (mapping)
@@ -53,10 +54,17 @@
   , m_executor (new Executor(1))
   , m_defaultSegmentCallback(defaultSegmentCallback)
   , m_defaultFinishCallback(defaultFinishCallback)
+  , m_taskDb(taskDb)
 {
   m_scheduler->start ();
   m_executor->start();
 
+  // resume un-finished fetches if there is any
+  if (m_taskDb)
+  {
+    m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
+  }
+
   m_scheduleFetchesTask = Scheduler::schedulePeriodicTask (m_scheduler,
                                                            make_shared<SimpleIntervalGenerator> (300), // no need to check to often. if needed, will be rescheduled
                                                            bind (&FetchManager::ScheduleFetches, this), SCHEDULE_FETCHES_TAG);
@@ -94,6 +102,8 @@
   Name forwardingHint;
   forwardingHint = m_mapping (deviceName);
 
+  m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+
   unique_lock<mutex> lock (m_parellelFetchMutex);
 
   _LOG_TRACE ("++++ Create fetcher: " << baseName);
@@ -101,7 +111,7 @@
                                   m_executor,
                                   segmentCallback,
                                   finishCallback,
-                                  bind (&FetchManager::DidFetchComplete, this, _1),
+                                  bind (&FetchManager::DidFetchComplete, this, _1, _2, _3),
                                   bind (&FetchManager::DidNoDataTimeout, this, _1),
                                   deviceName, baseName, minSeqNo, maxSeqNo,
                                   boost::posix_time::seconds (30),
@@ -205,13 +215,14 @@
 }
 
 void
-FetchManager::DidFetchComplete (Fetcher &fetcher)
+FetchManager::DidFetchComplete (Fetcher &fetcher, const Name &deviceName, const Name &baseName)
 {
   {
     unique_lock<mutex> lock (m_parellelFetchMutex);
     m_currentParallelFetches --;
     _LOG_TRACE ("+++++ removing fetcher: " << fetcher.GetName ());
     m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
+    m_taskDb->deleteTask(deviceName, baseName);
   }
 
   m_scheduler->rescheduleTaskAt (m_scheduleFetchesTask, 0);
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 8c0ad83..31cb063 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -31,6 +31,7 @@
 #include "scheduler.h"
 #include "executor.h"
 #include "ccnx-wrapper.h"
+#include "fetch-task-db.h"
 
 #include "fetcher.h"
 
@@ -51,6 +52,7 @@
                 , uint32_t parallelFetches = 3
                 , const SegmentCallback &defaultSegmentCallback = SegmentCallback()
                 , const FinishCallback &defaultFinishCallback = FinishCallback()
+                , const FetchTaskDbPtr &taskDb = FetchTaskDbPtr()
                 );
   virtual ~FetchManager ();
 
@@ -78,7 +80,7 @@
   DidNoDataTimeout (Fetcher &fetcher);
 
   void
-  DidFetchComplete (Fetcher &fetcher);
+  DidFetchComplete (Fetcher &fetcher, const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
 
   void
   ScheduleFetches ();
@@ -102,6 +104,7 @@
   TaskPtr m_scheduleFetchesTask;
   SegmentCallback m_defaultSegmentCallback;
   FinishCallback m_defaultFinishCallback;
+  FetchTaskDbPtr m_taskDb;
 };
 
 Ccnx::CcnxWrapperPtr
diff --git a/src/fetch-task-db.cc b/src/fetch-task-db.cc
new file mode 100644
index 0000000..11ae6c9
--- /dev/null
+++ b/src/fetch-task-db.cc
@@ -0,0 +1,126 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ *         Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#include "fetch-task-db.h"
+#include "db-helper.h"
+
+using namespace std;
+using namespace boost;
+using namespace Ccnx;
+namespace fs = boost::filesystem;
+
+const string INIT_DATABASE = "\
+CREATE TABLE IF NOT EXISTS                                      \n\
+  Task(                                                         \n\
+    deviceName  BLOB NOT NULL,                                  \n\
+    baseName    BLOB NOT NULL,                                  \n\
+    minSeqNo    INTEGER,                                        \n\
+    maxSeqNo    INTEGER,                                        \n\
+    priority    INTEGER,                                        \n\
+    PRIMARY KEY (deviceName, baseName)                          \n\
+  );                                                            \n\
+CREATE INDEX identifier ON Task (deviceName, baseName);         \n\
+";
+
+FetchTaskDb::FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag)
+{
+  fs::path actualFolder = folder / ".chronoshare" / "fetch_tasks";
+  fs::create_directories (actualFolder);
+
+  int res = sqlite3_open((actualFolder / tag).c_str(), &m_db);
+  if (res != SQLITE_OK)
+  {
+    BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Cannot open database: " + (actualFolder / tag).string()));
+  }
+
+  char *errmsg = 0;
+  res = sqlite3_exec(m_db, INIT_DATABASE.c_str(), NULL, NULL, &errmsg);
+  if (res != SQLITE_OK && errmsg != 0)
+  {
+      // _LOG_TRACE ("Init \"error\": " << errmsg);
+      sqlite3_free (errmsg);
+  }
+  else
+  {
+  }
+}
+
+FetchTaskDb::~FetchTaskDb()
+{
+  int res = sqlite3_close(m_db);
+  if (res != SQLITE_OK)
+  {
+    // _LOG_ERROR
+  }
+}
+
+void
+FetchTaskDb::addTask(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+  sqlite3_stmt *stmt;
+  sqlite3_prepare_v2(m_db, "INSERT OR IGNORE INTO Task (deviceName, baseName, minSeqNo, maxSeqNo, priority) VALUES (?, ?, ?, ?, ?)", -1, &stmt, 0);
+  CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+  CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+  sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+  sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+  sqlite3_bind_int64(stmt, 3, minSeqNo);
+  sqlite3_bind_int64(stmt, 4, maxSeqNo);
+  sqlite3_bind_int(stmt, 5, priority);
+  int res = sqlite3_step(stmt);
+
+  if (res == SQLITE_OK)
+  {
+  }
+  sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::deleteTask(const Name &deviceName, const Name &baseName)
+{
+  sqlite3_stmt *stmt;
+  sqlite3_prepare_v2(m_db, "DELETE FROM Task WHERE deviceName = ? AND baseName = ?;", -1, &stmt, 0);
+  CcnxCharbufPtr deviceBuf = CcnxCharbufPtr(deviceName);
+  CcnxCharbufPtr baseBuf = CcnxCharbufPtr(baseName);
+  sqlite3_bind_blob(stmt, 1, deviceBuf->buf(), deviceBuf->length(), SQLITE_STATIC);
+  sqlite3_bind_blob(stmt, 2, baseBuf->buf(), baseBuf->length(), SQLITE_STATIC);
+  int res = sqlite3_step(stmt);
+  if (res == SQLITE_OK)
+  {
+  }
+  sqlite3_finalize(stmt);
+}
+
+void
+FetchTaskDb::foreachTask(const FetchTaskCallback &callback)
+{
+  sqlite3_stmt *stmt;
+  sqlite3_prepare_v2(m_db, "SELECT * FROM Task;", -1, &stmt, 0);
+  while (sqlite3_step(stmt) == SQLITE_ROW)
+  {
+     Name deviceName(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
+     Name baseName(sqlite3_column_blob(stmt, 1), sqlite3_column_bytes(stmt, 1));
+     uint64_t minSeqNo = sqlite3_column_int64(stmt, 2);
+     uint64_t maxSeqNo = sqlite3_column_int64(stmt, 3);
+     int priority = sqlite3_column_int(stmt, 4);
+     callback(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+  }
+
+  sqlite3_finalize(stmt);
+}
diff --git a/src/fetch-task-db.h b/src/fetch-task-db.h
new file mode 100644
index 0000000..1f8e16a
--- /dev/null
+++ b/src/fetch-task-db.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ *         Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+#ifndef FETCH_TASK_DB_H
+#define FETCH_TASK_DB_H
+
+#include <sqlite3.h>
+#include <ccnx-common.h>
+#include <ccnx-name.h>
+#include <boost/filesystem.hpp>
+#include <boost/shared_ptr.hpp>
+
+class FetchTaskDb
+{
+public:
+  FetchTaskDb(const boost::filesystem::path &folder, const std::string &tag);
+  ~FetchTaskDb();
+
+  // task with same deviceName and baseName combination will be added only once
+  // if task already exists, this call does nothing
+  void
+  addTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority);
+
+  void
+  deleteTask(const Ccnx::Name &deviceName, const Ccnx::Name &baseName);
+
+  typedef boost::function<void(const Ccnx::Name &, const Ccnx::Name &, uint64_t, uint64_t, int)> FetchTaskCallback;
+
+  void
+  foreachTask(const FetchTaskCallback &callback);
+
+private:
+  sqlite3 *m_db;
+};
+
+typedef boost::shared_ptr<FetchTaskDb> FetchTaskDbPtr;
+
+#endif // FETCH_TASK_DB_H
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 98b8ceb..044c5ca 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -199,7 +199,7 @@
       // tell FetchManager that we have finish our job
       // m_onFetchComplete (*this);
       // using executor, so we won't be deleted if there is scheduled FillPipeline call
-      m_executor->execute (bind (m_onFetchComplete, ref(*this)));
+      m_executor->execute (bind (m_onFetchComplete, ref(*this), m_deviceName, m_name));
     }
   else
     {
diff --git a/src/fetcher.h b/src/fetcher.h
index fa2ce7d..41a2301 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -36,7 +36,7 @@
 public:
   typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
   typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
-  typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
+  typedef boost::function<void (Fetcher &, const Ccnx::Name &deviceName, const Ccnx::Name &baseName)> OnFetchCompleteCallback;
   typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
 
   Fetcher (Ccnx::CcnxWrapperPtr ccnx,
diff --git a/test/test-fetch-task-db.cc b/test/test-fetch-task-db.cc
new file mode 100644
index 0000000..1e647af
--- /dev/null
+++ b/test/test-fetch-task-db.cc
@@ -0,0 +1,165 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ *	   Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ */
+
+#include "logging.h"
+#include "fetch-task-db.h"
+
+#include <boost/filesystem.hpp>
+#include <boost/filesystem/fstream.hpp>
+#include <boost/function.hpp>
+#include <boost/bind.hpp>
+
+#include <boost/test/unit_test.hpp>
+#include <unistd.h>
+#include <boost/make_shared.hpp>
+#include <iostream>
+#include <iterator>
+#include <map>
+#include <utility>
+
+INIT_LOGGER ("Test.FetchTaskDb");
+
+using namespace Ccnx;
+using namespace std;
+using namespace boost;
+namespace fs = boost::filesystem;
+
+BOOST_AUTO_TEST_SUITE(TestFetchTaskDb)
+
+class Checker
+{
+public:
+  Checker(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+        : m_deviceName(deviceName), m_baseName(baseName), m_minSeqNo(minSeqNo), m_maxSeqNo(maxSeqNo), m_priority(priority)
+  {}
+
+  Checker(const Checker &other)
+        : m_deviceName(other.m_deviceName), m_baseName(other.m_baseName), m_minSeqNo(other.m_minSeqNo), m_maxSeqNo(other.m_maxSeqNo), m_priority(other.m_priority)
+  {}
+
+  bool
+  operator==(const Checker &other) { return m_deviceName == other.m_deviceName && m_baseName == other.m_baseName && m_minSeqNo == other.m_minSeqNo && m_maxSeqNo == other.m_maxSeqNo && m_priority == other.m_priority; }
+
+  void show()
+  {
+    cout << m_deviceName  <<", " << m_baseName << ", " << m_minSeqNo << ", " << m_maxSeqNo << ", " << m_priority << endl;
+  }
+
+  Name m_deviceName;
+  Name m_baseName;
+  uint64_t m_minSeqNo;
+  uint64_t m_maxSeqNo;
+  int m_priority;
+};
+
+map<Name, Checker> checkers;
+int g_counter = 0;
+
+void
+getChecker(const Name &deviceName, const Name &baseName, uint64_t minSeqNo, uint64_t maxSeqNo, int priority)
+{
+  Checker checker(deviceName, baseName, minSeqNo, maxSeqNo, priority);
+  g_counter ++;
+  if (checkers.find(checker.m_deviceName + checker.m_baseName) != checkers.end())
+  {
+    BOOST_FAIL("duplicated checkers");
+  }
+  checkers.insert(make_pair(checker.m_deviceName + checker.m_baseName, checker));
+}
+
+BOOST_AUTO_TEST_CASE (FetchTaskDbTest)
+{
+  INIT_LOGGERS ();
+  fs::path folder("TaskDbTest");
+  fs::create_directories(folder / ".chronoshare");
+
+  FetchTaskDbPtr db = make_shared<FetchTaskDb>(folder, "test");
+
+  map<Name, Checker> m1;
+  g_counter = 0;
+
+  checkers.clear();
+
+  Name deviceNamePrefix("/device");
+  Name baseNamePrefix("/device/base");
+
+  // add 10 tasks
+  for (uint64_t i = 0; i < 10; i++)
+  {
+    Name d = deviceNamePrefix;
+    Name b = baseNamePrefix;
+    Checker c(d.appendComp(i), b.appendComp(i), i, 11, 1);
+    m1.insert(make_pair(d + b, c));
+    db->addTask(c.m_deviceName, c.m_baseName, c.m_minSeqNo, c.m_maxSeqNo, c.m_priority);
+  }
+
+  // delete the latter 5
+  for (uint64_t i = 5; i < 10; i++)
+  {
+    Name d = deviceNamePrefix;
+    Name b = baseNamePrefix;
+    d.appendComp(i);
+    b.appendComp(i);
+    db->deleteTask(d, b);
+  }
+
+  // add back 3 to 7, 3 and 4 should not be added twice
+
+  for (uint64_t i = 3; i < 8; i++)
+  {
+    Name d = deviceNamePrefix;
+    Name b = baseNamePrefix;
+    Checker c(d.appendComp(i), b.appendComp(i), i, 11, 1);
+    db->addTask(c.m_deviceName, c.m_baseName, c.m_minSeqNo, c.m_maxSeqNo, c.m_priority);
+  }
+
+  db->foreachTask(bind(getChecker, _1, _2, _3, _4, _5));
+
+  BOOST_CHECK_EQUAL(g_counter, 8);
+
+  map<Name, Checker>::iterator it = checkers.begin();
+  while (it != checkers.end())
+  {
+    map<Name, Checker>::iterator mt = m1.find(it->first);
+    if (mt == m1.end())
+    {
+      BOOST_FAIL("unknown task found");
+    }
+    else
+    {
+      Checker c1 = it->second;
+      Checker c2 = mt->second;
+      BOOST_CHECK(c1 == c2);
+      if (! (c1 == c2))
+      {
+        cout << "C1: " << endl;
+        c1.show();
+        cout << "C2: " << endl;
+        c2.show();
+      }
+    }
+    ++it;
+  }
+  fs::remove_all(folder);
+}
+
+
+BOOST_AUTO_TEST_SUITE_END()