Adding test for the database and sync log. Preliminary file state maintenance operations
diff --git a/src/action-log.cc b/src/action-log.cc
index 41166e7..293a79a 100644
--- a/src/action-log.cc
+++ b/src/action-log.cc
@@ -28,6 +28,14 @@
ActionLog::ActionLog (const std::string &path, const std::string &localName)
: SyncLog (path, localName)
{
+ int res = sqlite3_create_function (m_db, "apply_action", -1, SQLITE_ANY, reinterpret_cast<void*> (this),
+ ActionLog::apply_action_xFun,
+ 0, 0);
+ if (res != SQLITE_OK)
+ {
+ BOOST_THROW_EXCEPTION (Error::Db ()
+ << errmsg_info_str ("Cannot create function ``apply_action''"));
+ }
}
tuple<sqlite3_int64, sqlite3_int64, sqlite3_int64, string>
@@ -37,7 +45,7 @@
sqlite3_stmt *stmt;
int res = sqlite3_prepare_v2 (m_db, "SELECT a.version,a.device_id,a.seq_no,a.action,s.device_name "
"FROM ActionLog a JOIN SyncNodes s ON s.device_id = a.device_id "
- "WHERE filename=? ORDER BY a.version DESC,a.device_id DESC LIMIT 1", -1, &stmt, 0);
+ "WHERE filename=? ORDER BY a.version DESC LIMIT 1", -1, &stmt, 0);
if (res != SQLITE_OK)
{
@@ -230,3 +238,18 @@
sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
}
+
+
+void
+ActionLog::apply_action_xFun (sqlite3_context *context, int argc, sqlite3_value **argv)
+{
+ ActionLog *the = reinterpret_cast<ActionLog*> (sqlite3_user_data (context));
+
+ cout << "apply_function called with " << argc << endl;
+
+ cout << "device_name: " << sqlite3_value_text (argv[0]) << endl;
+ cout << "action: " << sqlite3_value_int (argv[1]) << endl;
+ cout << "filename: " << sqlite3_value_text (argv[2]) << endl;
+
+ sqlite3_result_null (context);
+}
diff --git a/src/action-log.h b/src/action-log.h
index 1d63fb8..f6cc0a7 100644
--- a/src/action-log.h
+++ b/src/action-log.h
@@ -48,6 +48,9 @@
private:
boost::tuple<sqlite3_int64, sqlite3_int64, sqlite3_int64, std::string>
GetExistingRecord (const std::string &filename);
+
+ static void
+ apply_action_xFun (sqlite3_context *context, int argc, sqlite3_value **argv);
protected:
};
diff --git a/src/database-test.cc b/src/database-test.cc
deleted file mode 100644
index ba1fecc..0000000
--- a/src/database-test.cc
+++ /dev/null
@@ -1,80 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- * Zhenkai Zhu <zhenkai@cs.ucla.edu>
- */
-
-#include "db-helper.h"
-#include <iostream>
-
-using namespace std;
-using namespace boost;
-
-typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
-
-
-int
-main (int argc, char **argv)
-{
- try
- {
- DbHelper db ("./");
-
- HashPtr hash = db.RememberStateInStateLog ();
- // should be empty
- cout << "Hash: [" << *hash << "]" << endl;
-
- //
- db.UpdateDeviceSeqno ("Alex", 1);
- hash = db.RememberStateInStateLog ();
- cout << "Hash: [" << *hash << "]" << endl;
-
- db.UpdateDeviceSeqno ("Alex", 2);
- hash = db.RememberStateInStateLog ();
- cout << "Hash: [" << *hash << "]" << endl;
-
- db.UpdateDeviceSeqno ("Alex", 2);
- hash = db.RememberStateInStateLog ();
- cout << "Hash: [" << *hash << "]" << endl;
-
- db.UpdateDeviceSeqno ("Alex", 1);
- hash = db.RememberStateInStateLog ();
- cout << "Hash: [" << *hash << "]" << endl;
-
- db.FindStateDifferences ("00", "ec0a9941fa726e1fb8f34ecdbd8e3faa75dc9dba22e6a2ea1d8482aae5fdfb52");
- db.FindStateDifferences ("ec0a9941fa726e1fb8f34ecdbd8e3faa75dc9dba22e6a2ea1d8482aae5fdfb52", "00");
- db.FindStateDifferences ("869c38c6dffe8911ced320aecc6d9244904d13d3e8cd21081311f2129b4557ce",
- "ec0a9941fa726e1fb8f34ecdbd8e3faa75dc9dba22e6a2ea1d8482aae5fdfb52");
- db.FindStateDifferences ("ec0a9941fa726e1fb8f34ecdbd8e3faa75dc9dba22e6a2ea1d8482aae5fdfb52",
- "869c38c6dffe8911ced320aecc6d9244904d13d3e8cd21081311f2129b4557ce");
-
- db.UpdateDeviceSeqno ("Bob", 1);
- hash = db.RememberStateInStateLog ();
- cout << "Hash: [" << *hash << "]" << endl;
-
- db.FindStateDifferences ("00", "48f4d95b503b9a79c2d5939fa67722b13fc01db861fc501d09efd0a38dbafab8");
- db.FindStateDifferences ("ec0a9941fa726e1fb8f34ecdbd8e3faa75dc9dba22e6a2ea1d8482aae5fdfb52",
- "48f4d95b503b9a79c2d5939fa67722b13fc01db861fc501d09efd0a38dbafab8");
- }
- catch (const boost::exception &e)
- {
- cout << "ERRORR: " << *get_error_info<errmsg_info_str> (e) << endl;
- }
-
- return 0;
-}
diff --git a/src/db-helper.cc b/src/db-helper.cc
index f0e78d0..7b6c091 100644
--- a/src/db-helper.cc
+++ b/src/db-helper.cc
@@ -120,6 +120,41 @@
CREATE INDEX ActionLog_filename_version ON ActionLog (filename,version); \n\
CREATE INDEX ActionLog_parent ON ActionLog (parent_device_id, parent_seq_no); \n\
CREATE INDEX ActionLog_action_name ON ActionLog (action_name); \n\
+ \n\
+CREATE TRIGGER ActionLogInsert_trigger \n\
+ AFTER INSERT ON ActionLog \n\
+ FOR EACH ROW \n\
+ WHEN (SELECT device_id \n\
+ FROM ActionLog \n\
+ WHERE filename=NEW.filename AND \n\
+ version > NEW.version) IS NULL AND \n\
+ (SELECT a.device_id \n\
+ FROM ActionLog a \n\
+ LEFT JOIN SyncNodes s ON s.device_id=a.device_id \n\
+ WHERE filename=NEW.filename AND \n\
+ version = NEW.version AND \n\
+ a.device_id != NEW.device_id AND \n\
+ s.device_name > (SELECT device_name \n\
+ FROM SyncNodes \n\
+ WHERE device_id=NEW.device_id)) IS NULL \n\
+ BEGIN \n\
+ SELECT apply_action ((SELECT device_name FROM SyncNodes where device_id=NEW.device_id), \
+ NEW.action,NEW.filename,NEW.file_hash, \
+ NEW.file_atime,NEW.file_mtime,NEW.file_ctime, \
+ NEW.file_chmod); /* function that applies action and adds record the FileState */ \n \
+ END; \n\
+ \n\
+CREATE TABLE FileState ( \n\
+ type INTEGER NOT NULL, /* 0 - newest, 1 - oldest */ \n\
+ filename TEXT NOT NULL, \n\
+ file_hash BLOB, /* NULL if action is \"delete\" */ \n\
+ file_atime TIMESTAMP, \n\
+ file_mtime TIMESTAMP, \n\
+ file_ctime TIMESTAMP, \n\
+ file_chmod INTEGER, \n\
+ \n\
+ PRIMARY KEY (type, filename) \n\
+); \n\
";
DbHelper::DbHelper (const std::string &path)
diff --git a/src/hash-helper.cc b/src/hash-helper.cc
index 04b52b2..33c0e77 100644
--- a/src/hash-helper.cc
+++ b/src/hash-helper.cc
@@ -118,7 +118,7 @@
return retval;
}
-
+ retval->m_buf = new unsigned char [EVP_MAX_MD_SIZE];
unsigned char *end = copy (string_to_binary (hashInTextEncoding.begin ()),
string_to_binary (hashInTextEncoding.end ()),
diff --git a/src/object-db-file.h b/src/object-db-file.h
new file mode 100644
index 0000000..1eb1ef8
--- /dev/null
+++ b/src/object-db-file.h
@@ -0,0 +1,153 @@
+#ifndef OBJECT_DB_FILE_H
+#define OBJECT_DB_FILE_H
+
+#include "object-db.h"
+#include <stdio.h>
+#include <fstream>
+#include <sstream>
+#include <deque>
+#include <boost/thread/locks.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/interprocess/sync/file_lock.hpp>
+#include <boost/interprocess/sync/sharable_lock.hpp>
+#include <boost/interprocess/sync/scoped_lock.hpp>
+
+#define _OVERRIDE
+#ifdef __GNUC__
+#if __GNUC_MAJOR >= 4 && __GNUC_MINOR__ >= 7
+ #undef _OVERRIDE
+ #define _OVERRIDE override
+#endif // __GNUC__ version
+#endif // __GNUC__
+
+using namespace std;
+
+// This is a file based ObjectDB implementation
+// The assumption is, the Content Objects will be stored sequentially
+
+// To provide random access, we will have a table of "address" for each
+// ContentObject at the beginning of the file.
+// This also requires another assumption, that is the number of COs must
+// be know a priori. This requirement is reasonable for our dropbox-like
+// System, as the file we publish is static file and we can easily know
+// the number of COs before we store them into ObjectDB.
+
+/* How file looks like:
+ * |MAGIC_NUM|capacity|size|pos for each CO ...|1st CO|2nd CO| ... |
+ */
+
+class ObjectDBFile
+{
+public:
+ typedef boost::interprocess::file_lock Filelock;
+ typedef boost::interprocess::scoped_lock<Filelock> WriteLock;
+ typedef boost::interprocess::sharable_lock<Filelock> ReadLock;
+ typedef boost::shared_mutex Mutex;
+ typedef boost::shared_lock<Mutex> SLock;
+ typedef boost::unique_lock<Mutex> ULock;
+
+ ObjectDBFile(const string &filename);
+ virtual ~ObjectDBFile();
+
+ // reserve the "address" table for n COs; must reserve before
+ // write anything (unless reserved quota has not be consumed yet)
+ void
+ init(int capacity);
+
+ bool
+ initialized() const { return m_initialized; }
+
+ // assume sequential
+ virtual void
+ append(const Bytes &co) _OVERRIDE;
+
+ // get next CO
+ virtual Bytes
+ next() _OVERRIDE;
+
+ // size in terms of number of COs
+ // This is the lazy form of size, i.e. it returns the size cached in this object
+ // but that may not necessarily equal to the actual size kept in file
+ // This is enough if the caller knows for sure that no other thread is changing the
+ // file or the caller does not care about the new size.
+ virtual int
+ size() const _OVERRIDE;
+
+ // this returns the actual size (also update the size cache in this object), but it is more costly, and requires file IO
+ int
+ fSize();
+
+ // the index of the CO to be read
+ int
+ index();
+
+ // set the pos to be the desired CO
+ // return true if success
+ bool
+ seek(int index);
+
+ // reset pos to be zero
+ void
+ rewind();
+
+protected:
+ // read or write lock should have been grabbed already before the call
+ void
+ checkInit(const string &msg);
+
+ // read lock should have been grabbed already before the call
+ void
+ updateSize();
+
+ // read lock should have been grabbed already before the call
+ void
+ fillDummyCache();
+
+ #define MAGIC_NUM 0xAAAAAAAA
+
+protected:
+ string m_filename;
+ ifstream m_istream;
+ ofstream m_ostream;
+ Filelock m_filelock;
+ bool m_initialized;
+ // capacity in terms of number of COs
+ int m_cap;
+ int m_size;
+ // the index (or seq) of the CO to be read
+ int m_index;
+
+ // A dummy Cache that holds the next 10 (or all remaining if less than 10)
+ // COs after a next() operation
+ // If needed and time allows, we can have more complex cache
+ #define CACHE_SIZE 10
+ map<int, Bytes> m_dummyCache;
+ Mutex m_cacheMutex;
+};
+
+void inline
+writeInt(ostream &out, const int &x)
+{
+ out.write((const char *)&x, sizeof(int));
+}
+
+void inline
+readInt(istream &in, int &x)
+{
+ in.read((char *)&x, sizeof(int));
+}
+
+// write size and then the actual bytes
+// operator << overloading is not used to avoid confusion
+void
+writeBytes(ostream &out, const Bytes &bytes);
+
+// read size and then the actual bytes
+void
+readBytes(istream &in, Bytes &bytes);
+
+char *
+head(const Bytes &bytes);
+
+#endif
diff --git a/src/object-db.h b/src/object-db.h
new file mode 100644
index 0000000..8c31418
--- /dev/null
+++ b/src/object-db.h
@@ -0,0 +1,42 @@
+#ifndef OBJECT_DB_H
+#define OBJECT_DB_H
+
+#include <boost/exception/all.hpp>
+#include <vector>
+
+using namespace std;
+
+struct ObjectDBException : virtual boost::exception, virtual exception { };
+typedef boost::error_info<struct tag_errmsg, std::string> error_info_str;
+
+inline void throwException(const string &msg) { boost::throw_exception(ObjectDBException() << error_info_str(msg)); }
+
+typedef unsigned char Byte;
+typedef vector<Byte> Bytes;
+
+// OK. This name is a bit miss-leading, but this ObjectDB is really some storage
+// that stores the Ccnx ContentObjects of a file. So unlike a normal database,
+// this DB is per file.
+
+// The assumption is, the ContentObjects will be write to ObjectDB sequentially
+// This guarantees that when read, the Nth ContentObject read has the sequence number N as the last component of its name
+class ObjectDB
+{
+public:
+ virtual ~ObjectDB(){}
+
+ // assume sequential
+ virtual void
+ append(const Bytes &co) = 0;
+
+ // get next CO
+ virtual Bytes
+ next() = 0;
+
+ // size in terms of number of COs
+ virtual int
+ size() = 0;
+
+};
+
+#endif