Merge branch 'master' of git.irl.cs.ucla.edu:ndn/chronoshare
diff --git a/include/ccnx-tunnel.h b/include/ccnx-tunnel.h
index 61ed964..af49a9b 100644
--- a/include/ccnx-tunnel.h
+++ b/include/ccnx-tunnel.h
@@ -2,7 +2,6 @@
 #define CCNX_TUNNEL_H
 
 #include <boost/variant.hpp>
-#include <boost/thread/locks.hpp>
 
 #include "ccnx-common.h"
 #include "ccnx-wrapper.h"
@@ -27,9 +26,6 @@
 public: 
   typedef multimap<string, InterestCallback> RegisteredInterestTable;
   typedef multimap<string, InterestCallback>::iterator RitIter;
-  typedef boost::shared_mutex Lock;
-  typedef boost::unique_lock<Lock> WriteLock;
-  typedef boost::shared_lock<Lock> ReadLock;
 
 
   CcnxTunnel(); 
@@ -39,6 +35,9 @@
   virtual int
   publishData(const string &name, const unsigned char *buf, size_t len, int freshness) _OVERRIDE;
 
+  int
+  publishContentObject(const string &name, const Bytes &contentObject, int freshness);
+
   virtual int
   sendInterest (const Interest &interest, Closure *closure);
 
diff --git a/include/ccnx-wrapper.h b/include/ccnx-wrapper.h
index 8d24b19..e5754da 100644
--- a/include/ccnx-wrapper.h
+++ b/include/ccnx-wrapper.h
@@ -10,6 +10,7 @@
 #include <ccn/signing.h>
 }
 
+#include <boost/thread/locks.hpp>
 #include <boost/thread/recursive_mutex.hpp>
 #include <boost/thread/thread.hpp>
 
@@ -26,6 +27,13 @@
 class CcnxWrapper 
 {
 public:
+  typedef boost::shared_mutex Lock;
+  typedef boost::unique_lock<Lock> WriteLock;
+  typedef boost::shared_lock<Lock> ReadLock;
+
+  typedef boost::recursive_mutex RecLock;
+  typedef boost::unique_lock<RecLock> UniqueRecLock;
+
   typedef boost::function<void (const string &)> InterestCallback;
 
   CcnxWrapper();
@@ -89,7 +97,7 @@
   ccn_keystore *m_keyStore;
   ccn_charbuf *m_keyLoactor;
   // to lock, use "boost::recursive_mutex::scoped_lock scoped_lock(mutex);
-  boost::recursive_mutex m_mutex;
+  RecLock m_mutex;
   boost::thread m_thread;
   bool m_running;
   bool m_connected;
diff --git a/include/object-db-file.h b/include/object-db-file.h
new file mode 100644
index 0000000..fb9ca91
--- /dev/null
+++ b/include/object-db-file.h
@@ -0,0 +1,143 @@
+#ifndef OBJECT_DB_FILE_H
+#define OBJECT_DB_FILE_H
+
+#include "object-db.h"
+#include <stdio.h>
+#include <fstream>
+#include <ifstream>
+#include <ofstream>
+#include <sstream>
+#include <boost/thread/locks.hpp>
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/lexical_cast.hpp>
+
+#define _OVERRIDE
+#ifdef __GNUC__
+#if __GNUC_MAJOR >= 4 && __GNUC_MINOR__ >= 7
+  #undef _OVERRIDE
+  #define _OVERRIDE override
+#endif // __GNUC__ version
+#endif // __GNUC__
+
+using namespace std;
+
+// This is a file based ObjectDB implementation
+// The assumption is, the Content Objects will be stored sequentially
+
+// To provide random access, we will have a table of "address" for each
+// ContentObject at the beginning of the file.
+// This also requires another assumption, that is the number of COs must
+// be know a priori. This requirement is reasonable for our dropbox-like
+// System, as the file we publish is static file and we can easily know
+// the number of COs before we store them into ObjectDB.
+
+/* How file looks like:
+ * |MAGIC_NUM|capacity|size|pos for each CO ...|1st CO|2nd CO| ... |
+ */
+
+class ObjectDBFile
+{
+public:
+  typedef boost::shared_mutex Lock;
+  typedef boost::unique_lock<Lock> WriteLock;
+  typedef boost::shared_lock<Lock> ReadLock;
+
+  ObjectDBFile(const string &filename);
+  virtual ~ObjectDBFile(){}
+
+  // reserve the "address" table for n COs; must reserve before
+  // write anything (unless reserved quota has not be consumed yet)
+  void
+  init(int capacity);
+
+  bool
+  initialized() const { return m_initialized; }
+
+  // assume sequential
+  virtual void
+  append(const Bytes &co) _OVERRIDE;
+
+  // get next CO
+  virtual Bytes
+  next() _OVERRIDE;
+
+  // get n COs; if the remaining number of COs < n, return all;
+  virtual void
+  read(vector<Bytes> &vco, int n) _OVERRIDE;
+
+  // size in terms of number of COs
+  // This is the lazy form of size, i.e. it returns the size cached in this object
+  // but that may not necessarily equal to the actual size kept in file
+  // This is enough if the caller knows for sure that no other thread is changing the
+  // file or the caller does not care about the new size.
+  virtual int
+  size() const _OVERRIDE;
+
+  // this returns the actual size (also update the size cache in this object), but it is more costly, and requires file IO
+  int
+  fSize();
+
+  // the index of the CO to be read
+  int
+  index();
+
+  // set the pos to be the desired CO
+  // return true if success
+  bool
+  seek(int index);
+
+  // reset pos to be zero
+  void
+  rewind();
+
+protected:
+  // write lock should have been grabbed already before the call
+  void
+  prepareWrite();
+
+  // read or write lock should have been grabbed already before the call
+  void
+  checkInit(const string &msg);
+
+  // read lock should have been grabbed already before the call
+  void
+  updateSize();
+
+  #define MAGIC_NUM 0xAAAAAAAA
+
+protected:
+  string m_filename;
+  ifstream m_istream;
+  ofstream m_ostream;
+  Lock m_lock;
+  bool m_writable;
+  bool m_initialized;
+  // capacity in terms of number of COs
+  int m_cap;
+  int m_size;
+  // the index (or seq) of the CO to be read
+  int m_index;
+};
+
+void inline
+writeInt(ostream &out, const int &x)
+{
+  out.write((const char *)&x, sizeof(int));
+}
+
+void inline
+readInt(istream &in, int &x)
+{
+  in.read((char *)&x, sizeof(int));
+}
+
+// write size and then the actual bytes
+// operator << overloading is not used to avoid confusion
+void
+writeBytes(ostream &out, const Bytes &bytes);
+
+// read size and then the actual bytes
+void
+readBytes(istream &in, Bytes &bytes);
+
+#endif
diff --git a/include/object-db.h b/include/object-db.h
new file mode 100644
index 0000000..4ad8504
--- /dev/null
+++ b/include/object-db.h
@@ -0,0 +1,46 @@
+#ifndef OBJECT_DB_H
+#define OBJECT_DB_H
+
+#include <boost/exception/all.hpp>
+#include <vector>
+
+using namespace std;
+
+struct ObjectDBException : virtual boost::exception, virtual exception { };
+typedef boost::error_info<struct tag_errmsg, std::string> error_info_str;
+
+void throwException(const string &msg) { boost::throw_exception(ObjectDBException() << error_info_str(msg)); }
+
+typedef unsigned char Byte;
+typedef vector<Byte> Bytes;
+
+// OK. This name is a bit miss-leading, but this ObjectDB is really some storage
+// that stores the Ccnx ContentObjects of a file. So unlike a normal database,
+// this DB is per file.
+
+// The assumption is, the ContentObjects will be write to ObjectDB sequentially
+// This guarantees that when read, the Nth ContentObject read has the sequence number N as the last component of its name
+class ObjectDB
+{
+public:
+  virtual ~ObjectDB(){}
+
+  // assume sequential
+  virtual void
+  append(const Bytes &co) = 0;
+
+  // get next CO
+  virtual Bytes
+  next() = 0;
+
+  // get n COs; if the remaining number of COs < n, return all;
+  virtual void
+  read(vector<Bytes> &vco, int n) = 0;
+
+  // size in terms of number of COs
+  virtual int
+  size() = 0 const;
+
+};
+
+#endif
diff --git a/src/ccnx-tunnel.cpp b/src/ccnx-tunnel.cpp
index b36b030..281079f 100644
--- a/src/ccnx-tunnel.cpp
+++ b/src/ccnx-tunnel.cpp
@@ -49,9 +49,14 @@
   Bytes content = createContentObject(name, buf, len, freshness);
   storeContentObject(name, content);
   
-  string tunneledName = m_localPrefix + name;
-  Bytes tunneledCo = createContentObject(tunneledName, head(content), content.size(), freshness);
+  return publishContentObject(name, content, freshness);
+}
 
+int
+CcnxTunnel::publishContentObject(const string &name, const Bytes &contentObject, int freshness)
+{
+  string tunneledName = m_localPrefix + name;
+  Bytes tunneledCo = createContentObject(tunneledName, head(contentObject), contentObject.size(), freshness);
   return putToCcnd(tunneledCo);
 }
 
diff --git a/src/ccnx-wrapper.cpp b/src/ccnx-wrapper.cpp
index 1b791c9..041665e 100644
--- a/src/ccnx-wrapper.cpp
+++ b/src/ccnx-wrapper.cpp
@@ -59,8 +59,7 @@
 void
 CcnxWrapper::connectCcnd()
 {
-  recursive_mutex::scoped_lock lock (m_mutex);
-
+  UniqueRecLock(m_mutex);
   if (m_handle != 0) {
     ccn_disconnect (m_handle);
     ccn_destroy (&m_handle);
@@ -86,7 +85,7 @@
 CcnxWrapper::~CcnxWrapper()
 {
   {
-    recursive_mutex::scoped_lock lock(m_mutex);
+    UniqueRecLock(m_mutex);
     m_running = false;
   }
   
@@ -150,7 +149,7 @@
         {
           int res = 0;
           {
-            recursive_mutex::scoped_lock lock (m_mutex);
+            UniqueRecLock(m_mutex);
             res = ccn_run (m_handle, 0);
           }
 
@@ -163,7 +162,7 @@
 
           pollfd pfds[1];
           {
-            recursive_mutex::scoped_lock lock (m_mutex);
+            UniqueRecLock(m_mutex);
           
             pfds[0].fd = ccn_get_connection_fd (m_handle);
             pfds[0].events = POLLIN;
@@ -258,7 +257,7 @@
 int
 CcnxWrapper::putToCcnd (const Bytes &contentObject)
 {
-  recursive_mutex::scoped_lock lock(m_mutex);
+  UniqueRecLock(m_mutex);
   if (!m_running || !m_connected)
     return -1;
   
@@ -389,7 +388,7 @@
 
 int CcnxWrapper::sendInterest (const Interest &interest, Closure *closure)
 {
-  recursive_mutex::scoped_lock lock(m_mutex);
+  UniqueRecLock(m_mutex);
   if (!m_running || !m_connected)
     return -1;
   
@@ -410,7 +409,7 @@
 
 int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
 {
-  recursive_mutex::scoped_lock lock(m_mutex);
+  UniqueRecLock(m_mutex);
   if (!m_running || !m_connected)
     return -1;
 
@@ -432,7 +431,7 @@
 void
 CcnxWrapper::clearInterestFilter (const std::string &prefix)
 {
-  recursive_mutex::scoped_lock lock(m_mutex);
+  UniqueRecLock(m_mutex);
   if (!m_running || !m_connected)
     return;
 
diff --git a/src/object-db-file.cpp b/src/object-db-file.cpp
new file mode 100644
index 0000000..ae13d91
--- /dev/null
+++ b/src/object-db-file.cpp
@@ -0,0 +1,222 @@
+#include "object-db-file.h"
+#include <assert.h>
+
+void
+writeBytes(ostream &out, const Bytes &bytes)
+{
+  int size = bytes.size();
+  writeInt(out, size);
+  out.write(head(bytes), size);
+}
+
+void
+readBytes(istream &in, Bytes &bytes)
+{
+  int size;
+  readInt(in, size);
+  bytes.reserve(size);
+  in.read(head(bytes), size);
+}
+
+ObjectDBFile::ObjectDBFile(const string &filename)
+                : m_size(0)
+                , m_cap(0)
+                , m_index(0)
+                , m_initialized(false)
+                , m_writable(false)
+                , m_filename(filename)
+{
+  m_istream.open(m_filename, ios_base::binary);
+  int magic;
+  ReadLock(m_lock);
+  readInt(m_istream, magic);
+  if (magic == MAGIC_NUM)
+  {
+    m_initialized = true;
+    readInt(m_istream, m_cap);
+    readInt(m_istream, m_size);
+    m_istream.seekg( (3 + m_cap) * sizeof(int), ios::beg);
+  }
+}
+
+ObjectDBFile::~ObjectDBFile()
+{
+  m_istream.close();
+  if (m_writable)
+  {
+    m_ostream.close();
+  }
+}
+
+void
+ObjectDBFile::init(int capacity)
+{
+  WriteLock(m_lock);
+  if (m_initialized)
+  {
+    throwException("Trying to init already initialized ObjectDBFile object" + m_filename);
+  }
+
+  if (!m_writable)
+  {
+    prepareWrite();
+  }
+
+  m_cap = capacity;
+  m_size = 0;
+
+  int magic = MAGIC_NUM;
+  writeInt(m_ostream, magic);
+  writeInt(m_ostream, m_cap);
+  writeInt(m_ostream, m_size);
+  m_initialized = true;
+
+  int count = size;
+  int offset = 0;
+  while (count-- > 0)
+  {
+    writeInt(m_ostream, offset);
+  }
+
+  // prepare read pos
+  m_istream.seekg(m_ostream.tellp(), ios::beg);
+
+  // DEBUG
+  assert(m_ostream.tellp() == ((3 + m_cap) * sizeof(int)));
+
+}
+
+// Append is not super efficient as it needs to seek and update the pos for the
+// Content object. However, in our app, it is the case the these objects are wrote
+// once and read multiple times, so it's not a big problem.
+void
+ObjectDBFile::append(const Bytes &co)
+{
+  WriteLock(m_lock);
+  checkInit("Trying to append to un-initialized ObjectDBFile: " + m_filename);
+
+  if (!m_writable)
+  {
+    prepareWrite();
+  }
+
+  if (m_size >= m_cap)
+  {
+    throwException("Exceed Maximum capacity: " + boost::lexical_cast<string>(m_cap));
+  }
+
+  // pos for this CO
+  int coPos = m_ostream.tellp();
+  // index field for this CO
+  int indexPos = (3 + m_size) * sizeof(int);
+
+  m_size++;
+
+  // Update size (is it necessary?) We'll do it for now anyway
+  m_ostream.seekp( 2 * sizeof(int), ios::beg);
+  writeInt(m_ostream, m_size);
+
+  // Write the pos for the CO
+  m_ostream.seekp(indexPos, ios::beg);
+  writeInt(m_ostream, coPos);
+
+  // write the content object
+  m_ostream.seekp(coPos, ios::beg);
+  writeBytes(m_ostream, co);
+
+  // By the end, the write pos is at the end of the file
+}
+
+Bytes
+ObjectDBFile::next()
+{
+}
+
+void
+ObjectDBFile::read(vector<Bytes> &vco, int n)
+{
+}
+
+int
+ObjectDBFile::size() const
+{
+  return m_size;
+}
+
+void
+ObjectDBFile::updateSzie()
+{
+  int pos = m_istream.tellg();
+  m_istream.seekg(2 * sizeof(int), ios::beg);
+  readInt(m_istream, m_size);
+  // recover the original pos
+  m_istream.seekg(pos, ios::beg);
+}
+
+int
+ObjectDBFile::fSize()
+{
+  ReadLock(m_lock);
+  updateSize();
+  return m_size;
+}
+
+int
+ObjectDBFile::index()
+{
+  ReadLock(m_lock);
+  return m_index;
+}
+
+bool
+ObjectDBFile::seek(int index)
+{
+  ReadLock(m_lock);
+  updateSize();
+  if (m_size <= index)
+  {
+    return false;
+  }
+  m_index = index;
+  m_istream.seekg( (3 + m_index) * sizeof(int), ios::beg);
+  int pos;
+  readInt(m_istream, pos);
+  m_istream.seekg(pos, ios::beg);
+  return true;
+}
+
+void
+rewind()
+{
+  ReadLock(m_lock);
+  m_index = 0;
+  // point to the start of the CO fields
+  m_istream.seekg( (3 + m_cap) * sizeof(int), ios::beg);
+}
+
+void
+ObjectDBFile::checkInit(const string &msg)
+{
+  if (!m_initialized)
+  {
+    throwException(msg);
+  }
+}
+
+void
+ObjectDBFile::prepareWrite()
+{
+  ios_base::openmode mode = ios_base::app | ios_base::binary;
+  // discard any content if the object is considered uninitialized
+  if (!m_initialized)
+  {
+    mode |= ios_base::trunc;
+  }
+
+  m_ostream.open(m_filename, mode);
+  m_writable = m_ostream.is_open();
+  if (!m_writable)
+  {
+    throwException("Unable to open file for write: " + m_filename);
+  }
+}