Merge branch 'master' of git.irl.cs.ucla.edu:ndn/chronoshare
diff --git a/include/ccnx-tunnel.h b/include/ccnx-tunnel.h
index 61ed964..af49a9b 100644
--- a/include/ccnx-tunnel.h
+++ b/include/ccnx-tunnel.h
@@ -2,7 +2,6 @@
#define CCNX_TUNNEL_H
#include <boost/variant.hpp>
-#include <boost/thread/locks.hpp>
#include "ccnx-common.h"
#include "ccnx-wrapper.h"
@@ -27,9 +26,6 @@
public:
typedef multimap<string, InterestCallback> RegisteredInterestTable;
typedef multimap<string, InterestCallback>::iterator RitIter;
- typedef boost::shared_mutex Lock;
- typedef boost::unique_lock<Lock> WriteLock;
- typedef boost::shared_lock<Lock> ReadLock;
CcnxTunnel();
@@ -39,6 +35,9 @@
virtual int
publishData(const string &name, const unsigned char *buf, size_t len, int freshness) _OVERRIDE;
+ int
+ publishContentObject(const string &name, const Bytes &contentObject, int freshness);
+
virtual int
sendInterest (const Interest &interest, Closure *closure);
diff --git a/include/ccnx-wrapper.h b/include/ccnx-wrapper.h
index 8d24b19..e5754da 100644
--- a/include/ccnx-wrapper.h
+++ b/include/ccnx-wrapper.h
@@ -10,6 +10,7 @@
#include <ccn/signing.h>
}
+#include <boost/thread/locks.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
@@ -26,6 +27,13 @@
class CcnxWrapper
{
public:
+ typedef boost::shared_mutex Lock;
+ typedef boost::unique_lock<Lock> WriteLock;
+ typedef boost::shared_lock<Lock> ReadLock;
+
+ typedef boost::recursive_mutex RecLock;
+ typedef boost::unique_lock<RecLock> UniqueRecLock;
+
typedef boost::function<void (const string &)> InterestCallback;
CcnxWrapper();
@@ -89,7 +97,7 @@
ccn_keystore *m_keyStore;
ccn_charbuf *m_keyLoactor;
// to lock, use "boost::recursive_mutex::scoped_lock scoped_lock(mutex);
- boost::recursive_mutex m_mutex;
+ RecLock m_mutex;
boost::thread m_thread;
bool m_running;
bool m_connected;
diff --git a/include/object-db-file.h b/include/object-db-file.h
new file mode 100644
index 0000000..fb9ca91
--- /dev/null
+++ b/include/object-db-file.h
@@ -0,0 +1,143 @@
+#ifndef OBJECT_DB_FILE_H
+#define OBJECT_DB_FILE_H
+
+#include "object-db.h"
+#include <stdio.h>
+#include <fstream>
+#include <ifstream>
+#include <ofstream>
+#include <sstream>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/lexical_cast.hpp>
+
+#define _OVERRIDE
+#ifdef __GNUC__
+#if __GNUC_MAJOR >= 4 && __GNUC_MINOR__ >= 7
+ #undef _OVERRIDE
+ #define _OVERRIDE override
+#endif // __GNUC__ version
+#endif // __GNUC__
+
+using namespace std;
+
+// This is a file based ObjectDB implementation
+// The assumption is, the Content Objects will be stored sequentially
+
+// To provide random access, we will have a table of "address" for each
+// ContentObject at the beginning of the file.
+// This also requires another assumption, that is the number of COs must
+// be know a priori. This requirement is reasonable for our dropbox-like
+// System, as the file we publish is static file and we can easily know
+// the number of COs before we store them into ObjectDB.
+
+/* How file looks like:
+ * |MAGIC_NUM|capacity|size|pos for each CO ...|1st CO|2nd CO| ... |
+ */
+
+class ObjectDBFile
+{
+public:
+ typedef boost::shared_mutex Lock;
+ typedef boost::unique_lock<Lock> WriteLock;
+ typedef boost::shared_lock<Lock> ReadLock;
+
+ ObjectDBFile(const string &filename);
+ virtual ~ObjectDBFile(){}
+
+ // reserve the "address" table for n COs; must reserve before
+ // write anything (unless reserved quota has not be consumed yet)
+ void
+ init(int capacity);
+
+ bool
+ initialized() const { return m_initialized; }
+
+ // assume sequential
+ virtual void
+ append(const Bytes &co) _OVERRIDE;
+
+ // get next CO
+ virtual Bytes
+ next() _OVERRIDE;
+
+ // get n COs; if the remaining number of COs < n, return all;
+ virtual void
+ read(vector<Bytes> &vco, int n) _OVERRIDE;
+
+ // size in terms of number of COs
+ // This is the lazy form of size, i.e. it returns the size cached in this object
+ // but that may not necessarily equal to the actual size kept in file
+ // This is enough if the caller knows for sure that no other thread is changing the
+ // file or the caller does not care about the new size.
+ virtual int
+ size() const _OVERRIDE;
+
+ // this returns the actual size (also update the size cache in this object), but it is more costly, and requires file IO
+ int
+ fSize();
+
+ // the index of the CO to be read
+ int
+ index();
+
+ // set the pos to be the desired CO
+ // return true if success
+ bool
+ seek(int index);
+
+ // reset pos to be zero
+ void
+ rewind();
+
+protected:
+ // write lock should have been grabbed already before the call
+ void
+ prepareWrite();
+
+ // read or write lock should have been grabbed already before the call
+ void
+ checkInit(const string &msg);
+
+ // read lock should have been grabbed already before the call
+ void
+ updateSize();
+
+ #define MAGIC_NUM 0xAAAAAAAA
+
+protected:
+ string m_filename;
+ ifstream m_istream;
+ ofstream m_ostream;
+ Lock m_lock;
+ bool m_writable;
+ bool m_initialized;
+ // capacity in terms of number of COs
+ int m_cap;
+ int m_size;
+ // the index (or seq) of the CO to be read
+ int m_index;
+};
+
+void inline
+writeInt(ostream &out, const int &x)
+{
+ out.write((const char *)&x, sizeof(int));
+}
+
+void inline
+readInt(istream &in, int &x)
+{
+ in.read((char *)&x, sizeof(int));
+}
+
+// write size and then the actual bytes
+// operator << overloading is not used to avoid confusion
+void
+writeBytes(ostream &out, const Bytes &bytes);
+
+// read size and then the actual bytes
+void
+readBytes(istream &in, Bytes &bytes);
+
+#endif
diff --git a/include/object-db.h b/include/object-db.h
new file mode 100644
index 0000000..4ad8504
--- /dev/null
+++ b/include/object-db.h
@@ -0,0 +1,46 @@
+#ifndef OBJECT_DB_H
+#define OBJECT_DB_H
+
+#include <boost/exception/all.hpp>
+#include <vector>
+
+using namespace std;
+
+struct ObjectDBException : virtual boost::exception, virtual exception { };
+typedef boost::error_info<struct tag_errmsg, std::string> error_info_str;
+
+void throwException(const string &msg) { boost::throw_exception(ObjectDBException() << error_info_str(msg)); }
+
+typedef unsigned char Byte;
+typedef vector<Byte> Bytes;
+
+// OK. This name is a bit miss-leading, but this ObjectDB is really some storage
+// that stores the Ccnx ContentObjects of a file. So unlike a normal database,
+// this DB is per file.
+
+// The assumption is, the ContentObjects will be write to ObjectDB sequentially
+// This guarantees that when read, the Nth ContentObject read has the sequence number N as the last component of its name
+class ObjectDB
+{
+public:
+ virtual ~ObjectDB(){}
+
+ // assume sequential
+ virtual void
+ append(const Bytes &co) = 0;
+
+ // get next CO
+ virtual Bytes
+ next() = 0;
+
+ // get n COs; if the remaining number of COs < n, return all;
+ virtual void
+ read(vector<Bytes> &vco, int n) = 0;
+
+ // size in terms of number of COs
+ virtual int
+ size() = 0 const;
+
+};
+
+#endif
diff --git a/src/ccnx-tunnel.cpp b/src/ccnx-tunnel.cpp
index b36b030..281079f 100644
--- a/src/ccnx-tunnel.cpp
+++ b/src/ccnx-tunnel.cpp
@@ -49,9 +49,14 @@
Bytes content = createContentObject(name, buf, len, freshness);
storeContentObject(name, content);
- string tunneledName = m_localPrefix + name;
- Bytes tunneledCo = createContentObject(tunneledName, head(content), content.size(), freshness);
+ return publishContentObject(name, content, freshness);
+}
+int
+CcnxTunnel::publishContentObject(const string &name, const Bytes &contentObject, int freshness)
+{
+ string tunneledName = m_localPrefix + name;
+ Bytes tunneledCo = createContentObject(tunneledName, head(contentObject), contentObject.size(), freshness);
return putToCcnd(tunneledCo);
}
diff --git a/src/ccnx-wrapper.cpp b/src/ccnx-wrapper.cpp
index 1b791c9..041665e 100644
--- a/src/ccnx-wrapper.cpp
+++ b/src/ccnx-wrapper.cpp
@@ -59,8 +59,7 @@
void
CcnxWrapper::connectCcnd()
{
- recursive_mutex::scoped_lock lock (m_mutex);
-
+ UniqueRecLock(m_mutex);
if (m_handle != 0) {
ccn_disconnect (m_handle);
ccn_destroy (&m_handle);
@@ -86,7 +85,7 @@
CcnxWrapper::~CcnxWrapper()
{
{
- recursive_mutex::scoped_lock lock(m_mutex);
+ UniqueRecLock(m_mutex);
m_running = false;
}
@@ -150,7 +149,7 @@
{
int res = 0;
{
- recursive_mutex::scoped_lock lock (m_mutex);
+ UniqueRecLock(m_mutex);
res = ccn_run (m_handle, 0);
}
@@ -163,7 +162,7 @@
pollfd pfds[1];
{
- recursive_mutex::scoped_lock lock (m_mutex);
+ UniqueRecLock(m_mutex);
pfds[0].fd = ccn_get_connection_fd (m_handle);
pfds[0].events = POLLIN;
@@ -258,7 +257,7 @@
int
CcnxWrapper::putToCcnd (const Bytes &contentObject)
{
- recursive_mutex::scoped_lock lock(m_mutex);
+ UniqueRecLock(m_mutex);
if (!m_running || !m_connected)
return -1;
@@ -389,7 +388,7 @@
int CcnxWrapper::sendInterest (const Interest &interest, Closure *closure)
{
- recursive_mutex::scoped_lock lock(m_mutex);
+ UniqueRecLock(m_mutex);
if (!m_running || !m_connected)
return -1;
@@ -410,7 +409,7 @@
int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
{
- recursive_mutex::scoped_lock lock(m_mutex);
+ UniqueRecLock(m_mutex);
if (!m_running || !m_connected)
return -1;
@@ -432,7 +431,7 @@
void
CcnxWrapper::clearInterestFilter (const std::string &prefix)
{
- recursive_mutex::scoped_lock lock(m_mutex);
+ UniqueRecLock(m_mutex);
if (!m_running || !m_connected)
return;
diff --git a/src/object-db-file.cpp b/src/object-db-file.cpp
new file mode 100644
index 0000000..ae13d91
--- /dev/null
+++ b/src/object-db-file.cpp
@@ -0,0 +1,222 @@
+#include "object-db-file.h"
+#include <assert.h>
+
+void
+writeBytes(ostream &out, const Bytes &bytes)
+{
+ int size = bytes.size();
+ writeInt(out, size);
+ out.write(head(bytes), size);
+}
+
+void
+readBytes(istream &in, Bytes &bytes)
+{
+ int size;
+ readInt(in, size);
+ bytes.reserve(size);
+ in.read(head(bytes), size);
+}
+
+ObjectDBFile::ObjectDBFile(const string &filename)
+ : m_size(0)
+ , m_cap(0)
+ , m_index(0)
+ , m_initialized(false)
+ , m_writable(false)
+ , m_filename(filename)
+{
+ m_istream.open(m_filename, ios_base::binary);
+ int magic;
+ ReadLock(m_lock);
+ readInt(m_istream, magic);
+ if (magic == MAGIC_NUM)
+ {
+ m_initialized = true;
+ readInt(m_istream, m_cap);
+ readInt(m_istream, m_size);
+ m_istream.seekg( (3 + m_cap) * sizeof(int), ios::beg);
+ }
+}
+
+ObjectDBFile::~ObjectDBFile()
+{
+ m_istream.close();
+ if (m_writable)
+ {
+ m_ostream.close();
+ }
+}
+
+void
+ObjectDBFile::init(int capacity)
+{
+ WriteLock(m_lock);
+ if (m_initialized)
+ {
+ throwException("Trying to init already initialized ObjectDBFile object" + m_filename);
+ }
+
+ if (!m_writable)
+ {
+ prepareWrite();
+ }
+
+ m_cap = capacity;
+ m_size = 0;
+
+ int magic = MAGIC_NUM;
+ writeInt(m_ostream, magic);
+ writeInt(m_ostream, m_cap);
+ writeInt(m_ostream, m_size);
+ m_initialized = true;
+
+ int count = size;
+ int offset = 0;
+ while (count-- > 0)
+ {
+ writeInt(m_ostream, offset);
+ }
+
+ // prepare read pos
+ m_istream.seekg(m_ostream.tellp(), ios::beg);
+
+ // DEBUG
+ assert(m_ostream.tellp() == ((3 + m_cap) * sizeof(int)));
+
+}
+
+// Append is not super efficient as it needs to seek and update the pos for the
+// Content object. However, in our app, it is the case the these objects are wrote
+// once and read multiple times, so it's not a big problem.
+void
+ObjectDBFile::append(const Bytes &co)
+{
+ WriteLock(m_lock);
+ checkInit("Trying to append to un-initialized ObjectDBFile: " + m_filename);
+
+ if (!m_writable)
+ {
+ prepareWrite();
+ }
+
+ if (m_size >= m_cap)
+ {
+ throwException("Exceed Maximum capacity: " + boost::lexical_cast<string>(m_cap));
+ }
+
+ // pos for this CO
+ int coPos = m_ostream.tellp();
+ // index field for this CO
+ int indexPos = (3 + m_size) * sizeof(int);
+
+ m_size++;
+
+ // Update size (is it necessary?) We'll do it for now anyway
+ m_ostream.seekp( 2 * sizeof(int), ios::beg);
+ writeInt(m_ostream, m_size);
+
+ // Write the pos for the CO
+ m_ostream.seekp(indexPos, ios::beg);
+ writeInt(m_ostream, coPos);
+
+ // write the content object
+ m_ostream.seekp(coPos, ios::beg);
+ writeBytes(m_ostream, co);
+
+ // By the end, the write pos is at the end of the file
+}
+
+Bytes
+ObjectDBFile::next()
+{
+}
+
+void
+ObjectDBFile::read(vector<Bytes> &vco, int n)
+{
+}
+
+int
+ObjectDBFile::size() const
+{
+ return m_size;
+}
+
+void
+ObjectDBFile::updateSzie()
+{
+ int pos = m_istream.tellg();
+ m_istream.seekg(2 * sizeof(int), ios::beg);
+ readInt(m_istream, m_size);
+ // recover the original pos
+ m_istream.seekg(pos, ios::beg);
+}
+
+int
+ObjectDBFile::fSize()
+{
+ ReadLock(m_lock);
+ updateSize();
+ return m_size;
+}
+
+int
+ObjectDBFile::index()
+{
+ ReadLock(m_lock);
+ return m_index;
+}
+
+bool
+ObjectDBFile::seek(int index)
+{
+ ReadLock(m_lock);
+ updateSize();
+ if (m_size <= index)
+ {
+ return false;
+ }
+ m_index = index;
+ m_istream.seekg( (3 + m_index) * sizeof(int), ios::beg);
+ int pos;
+ readInt(m_istream, pos);
+ m_istream.seekg(pos, ios::beg);
+ return true;
+}
+
+void
+rewind()
+{
+ ReadLock(m_lock);
+ m_index = 0;
+ // point to the start of the CO fields
+ m_istream.seekg( (3 + m_cap) * sizeof(int), ios::beg);
+}
+
+void
+ObjectDBFile::checkInit(const string &msg)
+{
+ if (!m_initialized)
+ {
+ throwException(msg);
+ }
+}
+
+void
+ObjectDBFile::prepareWrite()
+{
+ ios_base::openmode mode = ios_base::app | ios_base::binary;
+ // discard any content if the object is considered uninitialized
+ if (!m_initialized)
+ {
+ mode |= ios_base::trunc;
+ }
+
+ m_ostream.open(m_filename, mode);
+ m_writable = m_ostream.is_open();
+ if (!m_writable)
+ {
+ throwException("Unable to open file for write: " + m_filename);
+ }
+}