Merge branch 'master' of git.irl.cs.ucla.edu:ndn/chronoshare
Conflicts:
.gitignore
waf
wscript
diff --git a/.gitignore b/.gitignore
index a5b87d9..2182d4e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
-.lock-waf_darwin_build
-.waf*
+.waf-*
+*.pyc
build/
+.lock*
chronoshare.db
diff --git a/include/ccnx-closure.h b/include/ccnx-closure.h
new file mode 100644
index 0000000..54306d6
--- /dev/null
+++ b/include/ccnx-closure.h
@@ -0,0 +1,41 @@
+#ifndef CCNX_CLOSURE_H
+#define CCNX_CLOSURE_H
+
+#include "ccnx-common.h"
+
+using namespace std;
+
+namespace Ccnx {
+
+class Closure
+{
+public:
+ typedef boost::function<void (const string &, const Bytes &)> DataCallback;
+
+ typedef enum
+ {
+ RESULT_OK,
+ RESULT_REEXPRESS
+ } TimeoutCallbackReturnValue;
+
+ typedef boost::function<TimeoutCallbackReturnValue (const string &)> TimeoutCallback;
+
+ Closure(int retry, const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback);
+ Closure(const Closure &other);
+ int getRetry() {return m_retry;}
+ void decRetry() { m_retry--;}
+ virtual ~Closure();
+ virtual void
+ runDataCallback(const string &name, const Bytes &content);
+ virtual TimeoutCallbackReturnValue
+ runTimeoutCallback(const string &interest);
+
+protected:
+ int m_retry;
+ TimeoutCallback *m_timeoutCallback;
+ DataCallback *m_dataCallback;
+};
+
+} // Ccnx
+
+#endif
diff --git a/include/ccnx-common.h b/include/ccnx-common.h
new file mode 100644
index 0000000..7394c03
--- /dev/null
+++ b/include/ccnx-common.h
@@ -0,0 +1,24 @@
+#ifndef CCNX_COMMON_H
+#define CCNX_COMMON_H
+
+#include <vector>
+#include <boost/shared_ptr.hpp>
+#include <boost/exception/all.hpp>
+#include <boost/function.hpp>
+#include <string>
+#include <sstream>
+#include <map>
+#include <utility>
+
+using namespace std;
+namespace Ccnx {
+typedef vector<unsigned char> Bytes;
+
+void
+readRaw(Bytes &bytes, const unsigned char *src, size_t len);
+
+const unsigned char *
+head(const Bytes &bytes);
+
+} // Ccnx
+#endif // CCNX_COMMON_H
diff --git a/include/ccnx-pco.h b/include/ccnx-pco.h
new file mode 100644
index 0000000..1357333
--- /dev/null
+++ b/include/ccnx-pco.h
@@ -0,0 +1,37 @@
+#ifndef CCNX_CONTENT_OBJECT_H
+#define CCNX_CONTENT_OBJECT_H
+
+#include "ccnx-wrapper.h"
+#include "ccnx-common.h"
+
+using namespace std;
+
+namespace Ccnx {
+
+struct MisformedContentObjectException : virtual boost::exception, virtual exception { };
+
+class ParsedContentObject
+{
+public:
+ ParsedContentObject(const unsigned char *data, size_t len);
+ ParsedContentObject(const Bytes &bytes);
+ ParsedContentObject(const ParsedContentObject &other);
+ virtual ~ParsedContentObject();
+
+ Bytes
+ content() const;
+
+ string
+ name() const;
+
+protected:
+ ccn_parsed_ContentObject m_pco;
+ ccn_indexbuf *m_comps;
+ Bytes m_bytes;
+};
+
+typedef boost::shared_ptr<ParsedContentObject> PcoPtr;
+
+}
+
+#endif // CCNX_CONTENT_OBJECT_H
diff --git a/include/ccnx-tunnel.h b/include/ccnx-tunnel.h
index cddfcd5..4e1eade 100644
--- a/include/ccnx-tunnel.h
+++ b/include/ccnx-tunnel.h
@@ -1,15 +1,20 @@
#ifndef CCNX_TUNNEL_H
#define CCNX_TUNNEL_H
-#include <sync/sync-ccnx-wrapper.h>
-#include <string>
-#include <boost/function.hpp>
#include <boost/variant.hpp>
#include <boost/thread/locks.hpp>
-#include <utility>
-#include <map>
-using namespace Sync;
+#include "ccnx-common.h"
+#include "ccnx-wrapper.h"
+
+#define _OVERRIDE
+#ifdef __GNUC__
+#if __GNUC_MAJOR >= 4 && __GNUC_MINOR__ >= 7
+ #undef _OVERRIDE
+ #define _OVERRIDE override
+#endif // __GNUC__ version
+#endif // __GNUC__
+
using namespace std;
// Eventually, Sync::CcnxWrapper should be moved to this namespace.
@@ -20,11 +25,8 @@
class CcnxTunnel : public CcnxWrapper
{
public:
- typedef boost::variant<StringDataCallback, RawDataCallback> DataCallback;
- typedef multimap<string, DataCallback> PendingInterestTable;
typedef multimap<string, InterestCallback> RegisteredInterestTable;
- typedef multimap<string, DataCallback>iterator PitIter;
- typedef multimap<string, InterestCallback>iterator RitIter;
+ typedef multimap<string, InterestCallback>::iterator RitIter;
typedef boost::shared_mutex Lock;
typedef boost::unique_lock<Lock> WriteLock;
typedef boost::shared_lock<Lock> ReadLock;
@@ -35,50 +37,69 @@
// name is topology-independent
virtual int
- publishRawData(const string &name, const char *buf, size_t len, int freshness);
+ publishData(const string &name, const unsigned char *buf, size_t len, int freshness) _OVERRIDE;
+
+ virtual int
+ sendInterest (const std::string &interest, Closure *closure);
+
// prefix is topology-independent
virtual int
- setInterestFilter(const string &prefix, const InterestCallback &interestCallback);
+ setInterestFilter(const string &prefix, const InterestCallback &interestCallback) _OVERRIDE;
// prefix is topology-independent
// this clears all entries with key equal to prefix
virtual void
- clearInterestFilter(const string &prefix);
+ clearInterestFilter(const string &prefix) _OVERRIDE;
// subclass should provide translation service from topology-independent name
// to routable name
virtual string
- queryRoutableName(string &name) = 0;
+ queryRoutableName(const string &name) = 0;
// subclass should implement the function to store ContentObject with topoloy-independent
// name to the permanent storage; default does nothing
virtual void
- storeContentObject(string &name, ContentObjectPtr co) {}
+ storeContentObject(const string &name, const Bytes &content) {}
// should be called when connect to a different network
void
refreshLocalPrefix();
static bool
- isPrefix(string &prefix, string &name);
+ isPrefix(const string &prefix, const string &name);
-protected:
void
- handleTunneledInterest(string tunneldInterest);
-
- virtual int
- sendInterest (const std::string &strInterest, void *dataPass);
+ handleTunneledInterest(const string &tunneldInterest);
+
+ void
+ handleTunneledData(const string &name, const Bytes &tunneledData, const Closure::DataCallback &originalDataCallback);
protected:
// need a way to update local prefix, perhaps using macports trick, but eventually we need something more portable
string m_localPrefix;
- PendingInterestTable m_pit;
RegisteredInterestTable m_rit;
- Lock m_pitLock;
Lock m_ritLock;
};
+class TunnelClosure : public Closure
+{
+public:
+ TunnelClosure(int retry, const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback, CcnxTunnel *tunnel, const string &originalInterest);
+
+ TunnelClosure(const Closure *closure, CcnxTunnel *tunnel, const string &originalInterest);
+
+ virtual void
+ runDataCallback(const string &name, const Bytes &content) _OVERRIDE;
+
+ virtual TimeoutCallbackReturnValue
+ runTimeoutCallback(const string &interest) _OVERRIDE;
+
+private:
+ CcnxTunnel *m_tunnel;
+ string m_originalInterest;
+};
+
};
#endif
diff --git a/include/ccnx-wrapper.h b/include/ccnx-wrapper.h
new file mode 100644
index 0000000..6ee5749
--- /dev/null
+++ b/include/ccnx-wrapper.h
@@ -0,0 +1,104 @@
+#ifndef CCNX_WRAPPER_H
+#define CCNX_WRAPPER_H
+
+extern "C" {
+#include <ccn/ccn.h>
+#include <ccn/charbuf.h>
+#include <ccn/keystore.h>
+#include <ccn/uri.h>
+#include <ccn/bloom.h>
+#include <ccn/signing.h>
+}
+
+#include <boost/thread/recursive_mutex.hpp>
+#include <boost/thread/thread.hpp>
+
+#include "ccnx-common.h"
+#include "ccnx-closure.h"
+
+using namespace std;
+
+namespace Ccnx {
+
+struct CcnxOperationException : virtual boost::exception, virtual exception { };
+
+class CcnxWrapper
+{
+public:
+ typedef boost::function<void (const string &)> InterestCallback;
+
+ CcnxWrapper();
+ virtual ~CcnxWrapper();
+
+ virtual int
+ setInterestFilter (const string &prefix, const InterestCallback &interestCallback);
+
+ virtual void
+ clearInterestFilter (const string &prefix);
+
+ virtual int
+ sendInterest (const string &strInterest, Closure *closure);
+
+ virtual int
+ publishData (const string &name, const unsigned char *buf, size_t len, int freshness);
+
+ int
+ publishData (const string &name, const Bytes &content, int freshness);
+
+ static string
+ getLocalPrefix ();
+
+ static string
+ extractName(const unsigned char *data, const ccn_indexbuf *comps);
+
+protected:
+ Bytes
+ createContentObject(const string &name, const unsigned char *buf, size_t len, int freshness);
+
+ int
+ putToCcnd (const Bytes &contentObject);
+
+protected:
+ void
+ connectCcnd();
+
+ /// @cond include_hidden
+ void
+ createKeyLocator ();
+
+ void
+ initKeyStore ();
+
+ const ccn_pkey *
+ getPrivateKey ();
+
+ const unsigned char *
+ getPublicKeyDigest ();
+
+ ssize_t
+ getPublicKeyDigestLength ();
+
+ void
+ ccnLoop ();
+
+ /// @endcond
+
+protected:
+ ccn* m_handle;
+ ccn_keystore *m_keyStore;
+ ccn_charbuf *m_keyLoactor;
+ // to lock, use "boost::recursive_mutex::scoped_lock scoped_lock(mutex);
+ boost::recursive_mutex m_mutex;
+ boost::thread m_thread;
+ bool m_running;
+ bool m_connected;
+ map<string, InterestCallback> m_registeredInterests;
+ // std::list< std::pair<std::string, InterestCallback> > m_registeredInterests;
+};
+
+typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;
+
+
+} // Ccnx
+
+#endif
diff --git a/src/ccnx-closure.cpp b/src/ccnx-closure.cpp
new file mode 100644
index 0000000..43b0d65
--- /dev/null
+++ b/src/ccnx-closure.cpp
@@ -0,0 +1,45 @@
+#include "ccnx-closure.h"
+
+namespace Ccnx {
+
+Closure::Closure(int retry, const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback)
+ : m_retry(retry), m_timeoutCallback(NULL)
+{
+ m_timeoutCallback = new TimeoutCallback (timeoutCallback);
+ m_dataCallback = new DataCallback (dataCallback);
+}
+
+Closure::Closure(const Closure &other)
+{
+ Closure(other.m_retry, *(other.m_dataCallback), *(other.m_timeoutCallback));
+}
+
+Closure::~Closure ()
+{
+ delete m_dataCallback;
+ delete m_timeoutCallback;
+ m_dataCallback = NULL;
+ m_timeoutCallback = NULL;
+}
+
+Closure::TimeoutCallbackReturnValue
+Closure::runTimeoutCallback(const string &interest)
+{
+ if ((*m_timeoutCallback).empty())
+ {
+ return RESULT_OK;
+ }
+
+ return (*m_timeoutCallback)(interest);
+}
+
+
+void
+Closure::runDataCallback(const string &name, const Bytes &content)
+{
+ if (m_dataCallback != NULL) {
+ (*m_dataCallback)(name, content);
+ }
+}
+
+} // Ccnx
diff --git a/src/ccnx-pco.cpp b/src/ccnx-pco.cpp
new file mode 100644
index 0000000..df3bd06
--- /dev/null
+++ b/src/ccnx-pco.cpp
@@ -0,0 +1,55 @@
+#include "ccnx-pco.h"
+
+namespace Ccnx {
+
+ParsedContentObject::ParsedContentObject(const unsigned char *data, size_t len)
+ : m_comps(NULL)
+{
+ m_comps = ccn_indexbuf_create();
+ int res = ccn_parse_ContentObject(data, len, &m_pco, m_comps);
+ if (res < 0)
+ {
+ boost::throw_exception(MisformedContentObjectException());
+ }
+ readRaw(m_bytes, data, len);
+}
+
+ParsedContentObject::ParsedContentObject(const Bytes &bytes)
+{
+ ParsedContentObject(head(bytes), bytes.size());
+}
+
+ParsedContentObject::ParsedContentObject(const ParsedContentObject &other)
+{
+ ParsedContentObject(other.m_bytes);
+}
+
+ParsedContentObject::~ParsedContentObject()
+{
+ ccn_indexbuf_destroy(&m_comps);
+ m_comps = NULL;
+}
+
+Bytes
+ParsedContentObject::content() const
+{
+ const unsigned char *content;
+ size_t len;
+ Bytes bytes;
+ int res = ccn_content_get_value(head(m_bytes), m_pco.offset[CCN_PCO_E], &m_pco, &content, &len);
+ if (res < 0)
+ {
+ boost::throw_exception(MisformedContentObjectException());
+ }
+
+ readRaw(bytes, content, len);
+ return bytes;
+}
+
+string
+ParsedContentObject::name() const
+{
+ return CcnxWrapper::extractName(head(m_bytes), m_comps);
+}
+
+}
diff --git a/src/ccnx-tunnel.cpp b/src/ccnx-tunnel.cpp
index 2ac604a..467f7a1 100644
--- a/src/ccnx-tunnel.cpp
+++ b/src/ccnx-tunnel.cpp
@@ -1,4 +1,5 @@
#include "ccnx-tunnel.h"
+#include "ccnx-pco.h"
namespace Ccnx
{
@@ -14,6 +15,7 @@
{
}
+void
CcnxTunnel::refreshLocalPrefix()
{
string newPrefix = getLocalPrefix();
@@ -26,24 +28,39 @@
}
int
-CccnxTunnel::publishRawData(const string &name, const char *buf, size_t len, int freshness)
+CcnxTunnel::sendInterest (const std::string &interest, Closure *closure)
{
- ContentObjectPtr co = createContentObject(name, buf, len, freshness);
- storeContentObject(name, co);
+ string tunneledInterest = queryRoutableName(interest);
+ Closure *cp = new TunnelClosure(closure, this, interest);
+ sendInterest(tunneledInterest, cp);
+}
+
+void
+CcnxTunnel::handleTunneledData(const string &name, const Bytes &tunneledData, const Closure::DataCallback &originalDataCallback)
+{
+ ParsedContentObject pco(tunneledData);
+ originalDataCallback(pco.name(), pco.content());
+}
+
+int
+CcnxTunnel::publishData(const string &name, const unsigned char *buf, size_t len, int freshness)
+{
+ Bytes content = createContentObject(name, buf, len, freshness);
+ storeContentObject(name, content);
- string tunneledName = queryRoutableName(name);
- ContentObjectPtr tunneledCo = createContentObject(tunneledName, co->m_data, co->m_len, freshness);
+ string tunneledName = m_localPrefix + name;
+ Bytes tunneledCo = createContentObject(tunneledName, head(content), content.size(), freshness);
return putToCcnd(tunneledCo);
}
void
-CcnxTunnel::handleTunneledInterest(string tunneledInterest)
+CcnxTunnel::handleTunneledInterest(const string &tunneledInterest)
{
// The interest must have m_localPrefix as a prefix (component-wise), otherwise ccnd would not deliver it to us
string interest = (m_localPrefix == "/")
? tunneledInterest
- : tunneldInterest.substr(m_localPrefix.size());
+ : tunneledInterest.substr(m_localPrefix.size());
ReadLock(m_ritLock);
@@ -51,15 +68,15 @@
for (RitIter it = m_rit.begin(); it != m_rit.end(); it++)
{
// evoke callback for any prefix that is the prefix of the interest
- if (isPrefix(it->first(), interest))
+ if (isPrefix(it->first, interest))
{
- (it->second())(interest);
+ (it->second)(interest);
}
}
}
bool
-CcnxTunnel::isPrefix(string &prefix, string &name)
+CcnxTunnel::isPrefix(const string &prefix, const string &name)
{
// prefix is literally prefix of name
if (name.find(prefix) == 0)
@@ -78,7 +95,8 @@
CcnxTunnel::setInterestFilter(const string &prefix, const InterestCallback &interestCallback)
{
WriteLock(m_ritLock);
- m_rit.insert(make_pair(prefix, new InterestCallback(interestCallback)));
+ // make sure copy constructor for boost::function works properly
+ m_rit.insert(make_pair(prefix, interestCallback));
return 0;
}
@@ -90,5 +108,32 @@
m_rit.erase(prefix);
}
+TunnelClosure::TunnelClosure(int retry, const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback, CcnxTunnel *tunnel, const string &originalInterest)
+ : Closure(retry, dataCallback, timeoutCallback)
+ , m_tunnel(tunnel)
+ , m_originalInterest(originalInterest)
+{
}
+TunnelClosure::TunnelClosure(const Closure *closure, CcnxTunnel *tunnel, const string &originalInterest)
+ : Closure(*closure)
+ , m_tunnel(tunnel)
+{
+}
+
+void
+TunnelClosure::runDataCallback(const string &name, const Bytes &content)
+{
+ if (m_tunnel != NULL)
+ {
+ m_tunnel->handleTunneledData(name, content, (*m_dataCallback));
+ }
+}
+
+Closure::TimeoutCallbackReturnValue
+TunnelClosure::runTimeoutCallback(const string &interest)
+{
+ return Closure::runTimeoutCallback(m_originalInterest);
+}
+
+} // Ccnx
diff --git a/src/ccnx-wrapper.cpp b/src/ccnx-wrapper.cpp
new file mode 100644
index 0000000..0710c67
--- /dev/null
+++ b/src/ccnx-wrapper.cpp
@@ -0,0 +1,522 @@
+#include "ccnx-wrapper.h"
+extern "C" {
+#include <ccn/fetch.h>
+}
+#include <poll.h>
+#include <boost/throw_exception.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/random.hpp>
+#include <sstream>
+
+typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
+typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
+
+using namespace std;
+using namespace boost;
+
+namespace Ccnx {
+
+void
+readRaw(Bytes &bytes, const unsigned char *src, size_t len)
+{
+ if (len > 0)
+ {
+ bytes.resize(len);
+ memcpy(&bytes[0], src, len);
+ }
+}
+
+const unsigned char *
+head(const Bytes &bytes)
+{
+ return &bytes[0];
+}
+
+CcnxWrapper::CcnxWrapper()
+ : m_handle (0)
+ , m_keyStore (0)
+ , m_keyLoactor (0)
+ , m_running (true)
+ , m_connected (false)
+{
+ connectCcnd();
+ initKeyStore ();
+ createKeyLocator ();
+ m_thread = thread (&CcnxWrapper::ccnLoop, this);
+}
+
+void
+CcnxWrapper::connectCcnd()
+{
+ recursive_mutex::scoped_lock lock (m_mutex);
+
+ if (m_handle != 0) {
+ ccn_disconnect (m_handle);
+ ccn_destroy (&m_handle);
+ }
+
+ m_handle = ccn_create ();
+ if (ccn_connect(m_handle, NULL) < 0)
+ {
+ BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
+ }
+ m_connected = true;
+
+ if (!m_registeredInterests.empty())
+ {
+ for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
+ {
+ // clearInterestFilter(it->first);
+ setInterestFilter(it->first, it->second);
+ }
+ }
+}
+
+CcnxWrapper::~CcnxWrapper()
+{
+ {
+ recursive_mutex::scoped_lock lock(m_mutex);
+ m_running = false;
+ }
+
+ m_thread.join ();
+ ccn_disconnect (m_handle);
+ ccn_destroy (&m_handle);
+ ccn_charbuf_destroy (&m_keyLoactor);
+ ccn_keystore_destroy (&m_keyStore);
+}
+
+void
+CcnxWrapper::createKeyLocator ()
+{
+ m_keyLoactor = ccn_charbuf_create();
+ ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
+ ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
+ int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
+ if (res >= 0)
+ {
+ ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
+ ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
+ }
+}
+
+const ccn_pkey*
+CcnxWrapper::getPrivateKey ()
+{
+ return ccn_keystore_private_key (m_keyStore);
+}
+
+const unsigned char*
+CcnxWrapper::getPublicKeyDigest ()
+{
+ return ccn_keystore_public_key_digest(m_keyStore);
+}
+
+ssize_t
+CcnxWrapper::getPublicKeyDigestLength ()
+{
+ return ccn_keystore_public_key_digest_length(m_keyStore);
+}
+
+void
+CcnxWrapper::initKeyStore ()
+{
+ m_keyStore = ccn_keystore_create ();
+ string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
+ if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
+ BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
+}
+
+void
+CcnxWrapper::ccnLoop ()
+{
+ static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
+ static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
+
+ while (m_running)
+ {
+ try
+ {
+ int res = 0;
+ {
+ recursive_mutex::scoped_lock lock (m_mutex);
+ res = ccn_run (m_handle, 0);
+ }
+
+ if (!m_running) break;
+
+ if (res < 0)
+ BOOST_THROW_EXCEPTION (CcnxOperationException()
+ << errmsg_info_str("ccn_run returned error"));
+
+
+ pollfd pfds[1];
+ {
+ recursive_mutex::scoped_lock lock (m_mutex);
+
+ pfds[0].fd = ccn_get_connection_fd (m_handle);
+ pfds[0].events = POLLIN;
+ if (ccn_output_is_pending (m_handle))
+ pfds[0].events |= POLLOUT;
+ }
+
+ int ret = poll (pfds, 1, 1);
+ if (ret < 0)
+ {
+ BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
+ }
+ }
+ catch (CcnxOperationException &e)
+ {
+ // do not try reconnect for now
+ throw e;
+ /*
+ m_connected = false;
+ // probably ccnd has been stopped
+ // try reconnect with sleep
+ int interval = 1;
+ int maxInterval = 32;
+ while (m_running)
+ {
+ try
+ {
+ this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
+
+ connectCcnd();
+ _LOG_DEBUG("reconnect to ccnd succeeded");
+ break;
+ }
+ catch (CcnxOperationException &e)
+ {
+ this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
+
+ // do exponential backup for reconnect interval
+ if (interval < maxInterval)
+ {
+ interval *= 2;
+ }
+ }
+ }
+ */
+ }
+ catch (const std::exception &exc)
+ {
+ // catch anything thrown within try block that derives from std::exception
+ std::cerr << exc.what();
+ }
+ catch (...)
+ {
+ cout << "UNKNOWN EXCEPTION !!!" << endl;
+ }
+ }
+}
+
+Bytes
+CcnxWrapper::createContentObject(const std::string &name, const unsigned char *buf, size_t len, int freshness)
+{
+ ccn_charbuf *pname = ccn_charbuf_create();
+ ccn_charbuf *signed_info = ccn_charbuf_create();
+ ccn_charbuf *content = ccn_charbuf_create();
+
+ ccn_name_from_uri(pname, name.c_str());
+ ccn_signed_info_create(signed_info,
+ getPublicKeyDigest(),
+ getPublicKeyDigestLength(),
+ NULL,
+ CCN_CONTENT_DATA,
+ freshness,
+ NULL,
+ m_keyLoactor);
+ if(ccn_encode_ContentObject(content, pname, signed_info,
+ buf, len,
+ NULL, getPrivateKey()) < 0)
+ {
+ // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
+ }
+
+ Bytes bytes;
+ readRaw(bytes, content->buf, content->length);
+
+ ccn_charbuf_destroy (&pname);
+ ccn_charbuf_destroy (&signed_info);
+ ccn_charbuf_destroy (&content);
+
+ return bytes;
+}
+
+int
+CcnxWrapper::putToCcnd (const Bytes &contentObject)
+{
+ recursive_mutex::scoped_lock lock(m_mutex);
+ if (!m_running || !m_connected)
+ return -1;
+
+
+ if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
+ {
+ // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
+ }
+
+ return 0;
+}
+
+int
+CcnxWrapper::publishData (const string &name, const unsigned char *buf, size_t len, int freshness)
+{
+ Bytes co = createContentObject(name, buf, len, freshness);
+ return putToCcnd(co);
+}
+
+int
+CcnxWrapper::publishData (const string &name, const Bytes &content, int freshness)
+{
+ publishData(name, head(content), content.size(), freshness);
+}
+
+string
+CcnxWrapper::extractName(const unsigned char *data, const ccn_indexbuf *comps)
+{
+ ostringstream name (ostringstream::out);
+ for (int i = 0; i < comps->n - 1; i++)
+ {
+ char *comp;
+ size_t size;
+ name << "/";
+ ccn_name_comp_get(data, comps, i, (const unsigned char **)&comp, &size);
+ string compStr(comp, size);
+ name << compStr;
+ }
+
+ return name.str();
+}
+
+
+static ccn_upcall_res
+incomingInterest(ccn_closure *selfp,
+ ccn_upcall_kind kind,
+ ccn_upcall_info *info)
+{
+ CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+
+ switch (kind)
+ {
+ case CCN_UPCALL_FINAL: // effective in unit tests
+ delete f;
+ delete selfp;
+ return CCN_UPCALL_RESULT_OK;
+
+ case CCN_UPCALL_INTEREST:
+ break;
+
+ default:
+ return CCN_UPCALL_RESULT_OK;
+ }
+
+ string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
+
+ (*f) (interest);
+ return CCN_UPCALL_RESULT_OK;
+}
+
+static ccn_upcall_res
+incomingData(ccn_closure *selfp,
+ ccn_upcall_kind kind,
+ ccn_upcall_info *info)
+{
+ Closure *cp = static_cast<Closure *> (selfp->data);
+
+ switch (kind)
+ {
+ case CCN_UPCALL_FINAL: // effecitve in unit tests
+ delete cp;
+ cp = NULL;
+ delete selfp;
+ return CCN_UPCALL_RESULT_OK;
+
+ case CCN_UPCALL_CONTENT:
+ break;
+
+ case CCN_UPCALL_INTEREST_TIMED_OUT: {
+ if (cp != NULL && cp->getRetry() > 0) {
+ cp->decRetry();
+ return CCN_UPCALL_RESULT_REEXPRESS;
+ }
+
+ string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
+ Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
+ switch(rv)
+ {
+ case Closure::RESULT_OK : return CCN_UPCALL_RESULT_OK;
+ case Closure::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
+ default : break;
+ }
+ return CCN_UPCALL_RESULT_OK;
+ }
+
+ default:
+ return CCN_UPCALL_RESULT_OK;
+ }
+
+ const unsigned char *pcontent;
+ size_t len;
+ if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
+ {
+ // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
+ }
+
+ string name = CcnxWrapper::extractName(info->content_ccnb, info->content_comps);
+
+ Bytes content;
+ // copy content and do processing on the copy
+ // otherwise the pointed memory may have been changed during the processing
+ readRaw(content, pcontent, len);
+
+ cp->runDataCallback(name, content);
+
+ return CCN_UPCALL_RESULT_OK;
+}
+
+int CcnxWrapper::sendInterest (const string &strInterest, Closure *closure)
+{
+ recursive_mutex::scoped_lock lock(m_mutex);
+ if (!m_running || !m_connected)
+ return -1;
+
+ ccn_charbuf *pname = ccn_charbuf_create();
+ ccn_closure *dataClosure = new ccn_closure;
+
+ ccn_name_from_uri (pname, strInterest.c_str());
+ dataClosure->data = (void *)closure;
+
+ dataClosure->p = &incomingData;
+ if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
+ {
+ }
+
+ ccn_charbuf_destroy (&pname);
+ return 0;
+}
+
+int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
+{
+ recursive_mutex::scoped_lock lock(m_mutex);
+ if (!m_running || !m_connected)
+ return -1;
+
+ ccn_charbuf *pname = ccn_charbuf_create();
+ ccn_closure *interestClosure = new ccn_closure;
+
+ ccn_name_from_uri (pname, prefix.c_str());
+ interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+ interestClosure->p = &incomingInterest;
+ int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
+ if (ret < 0)
+ {
+ }
+
+ m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
+ ccn_charbuf_destroy(&pname);
+}
+
+void
+CcnxWrapper::clearInterestFilter (const std::string &prefix)
+{
+ recursive_mutex::scoped_lock lock(m_mutex);
+ if (!m_running || !m_connected)
+ return;
+
+ ccn_charbuf *pname = ccn_charbuf_create();
+
+ ccn_name_from_uri (pname, prefix.c_str());
+ int ret = ccn_set_interest_filter (m_handle, pname, 0);
+ if (ret < 0)
+ {
+ }
+
+ m_registeredInterests.erase(prefix);
+ ccn_charbuf_destroy(&pname);
+}
+
+string
+CcnxWrapper::getLocalPrefix ()
+{
+ struct ccn * tmp_handle = ccn_create ();
+ int res = ccn_connect (tmp_handle, NULL);
+ if (res < 0)
+ {
+ return "";
+ }
+
+ string retval = "";
+
+ struct ccn_charbuf *templ = ccn_charbuf_create();
+ ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
+ ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
+ ccn_charbuf_append_closer(templ); /* </Name> */
+ // XXX - use pubid if possible
+ ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
+ ccnb_append_number(templ, 1);
+ ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
+ ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
+ ccn_charbuf_append_closer(templ); /* </Interest> */
+
+ struct ccn_charbuf *name = ccn_charbuf_create ();
+ res = ccn_name_from_uri (name, "/local/ndn/prefix");
+ if (res < 0) {
+ }
+ else
+ {
+ struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
+
+ struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
+ NULL, 4, CCN_V_HIGHEST, 0);
+ if (stream == NULL) {
+ }
+ else
+ {
+ ostringstream os;
+
+ int counter = 0;
+ char buf[256];
+ while (true) {
+ res = ccn_fetch_read (stream, buf, sizeof(buf));
+
+ if (res == 0) {
+ break;
+ }
+
+ if (res > 0) {
+ os << string(buf, res);
+ } else if (res == CCN_FETCH_READ_NONE) {
+ if (counter < 2)
+ {
+ ccn_run(tmp_handle, 1000);
+ counter ++;
+ }
+ else
+ {
+ break;
+ }
+ } else if (res == CCN_FETCH_READ_END) {
+ break;
+ } else if (res == CCN_FETCH_READ_TIMEOUT) {
+ break;
+ } else {
+ break;
+ }
+ }
+ retval = os.str ();
+ stream = ccn_fetch_close(stream);
+ }
+ fetch = ccn_fetch_destroy(fetch);
+ }
+
+ ccn_charbuf_destroy (&name);
+
+ ccn_disconnect (tmp_handle);
+ ccn_destroy (&tmp_handle);
+
+ return retval;
+}
+
+}
diff --git a/waf b/waf
index 7195638..ef85684 100755
--- a/waf
+++ b/waf
Binary files differ
diff --git a/waf-tools/ccnx.py b/waf-tools/ccnx.py
new file mode 100644
index 0000000..5338b5f
--- /dev/null
+++ b/waf-tools/ccnx.py
@@ -0,0 +1,77 @@
+#! /usr/bin/env python
+# encoding: utf-8
+
+'''
+
+When using this tool, the wscript will look like:
+
+ def options(opt):
+ opt.tool_options('ccnx', tooldir=["waf-tools"])
+
+ def configure(conf):
+ conf.load('compiler_cxx ccnx')
+
+ def build(bld):
+ bld(source='main.cpp', target='app', use='CCNX')
+
+Options are generated, in order to specify the location of ccnx includes/libraries.
+
+
+'''
+import sys
+import re
+from waflib import Utils,Logs,Errors
+from waflib.Configure import conf
+CCNX_DIR=['/usr','/usr/local','/opt/local','/sw']
+CCNX_VERSION_FILE='ccn/ccn.h'
+CCNX_VERSION_CODE='''
+#include <iostream>
+#include <ccn/ccn.h>
+int main() { std::cout << ((CCN_API_VERSION/100000) % 100) << "." << ((CCN_API_VERSION/1000) % 100) << "." << (CCN_API_VERSION % 1000); }
+'''
+
+def options(opt):
+ opt.add_option('--ccnx',type='string',default='',dest='ccnx_dir',help='''path to where CCNx is installed, e.g. /usr/local''')
+@conf
+def __ccnx_get_version_file(self,dir):
+ # Logs.pprint ('CYAN', ' + %s/%s/%s' % (dir, 'include', CCNX_VERSION_FILE))
+ try:
+ return self.root.find_dir(dir).find_node('%s/%s' % ('include', CCNX_VERSION_FILE))
+ except:
+ return None
+@conf
+def ccnx_get_version(self,dir):
+ val=self.check_cxx(fragment=CCNX_VERSION_CODE,includes=['%s/%s' % (dir, 'include')],execute=True,define_ret = True, mandatory=True)
+ return val
+@conf
+def ccnx_get_root(self,*k,**kw):
+ root=k and k[0]or kw.get('path',None)
+ # Logs.pprint ('RED', ' %s' %root)
+ if root and self.__ccnx_get_version_file(root):
+ return root
+ for dir in CCNX_DIR:
+ if self.__ccnx_get_version_file(dir):
+ return dir
+ if root:
+ self.fatal('CCNx not found in %s'%root)
+ else:
+ self.fatal('CCNx not found, please provide a --ccnx argument (see help)')
+@conf
+def check_ccnx(self,*k,**kw):
+ if not self.env['CXX']:
+ self.fatal('load a c++ compiler first, conf.load("compiler_cxx")')
+
+ var=kw.get('uselib_store','CCNX')
+ self.start_msg('Checking CCNx')
+ root = self.ccnx_get_root(*k,**kw);
+ self.env.CCNX_VERSION=self.ccnx_get_version(root)
+
+ self.env['INCLUDES_%s'%var]= '%s/%s' % (root, "include");
+ self.env['LIB_%s'%var] = "ccn"
+ self.env['LIBPATH_%s'%var] = '%s/%s' % (root, "lib")
+
+ self.end_msg(self.env.CCNX_VERSION)
+ if Logs.verbose:
+ Logs.pprint('CYAN',' ccnx include : %s'%self.env['INCLUDES_%s'%var])
+ Logs.pprint('CYAN',' ccnx lib : %s'%self.env['LIB_%s'%var])
+ Logs.pprint('CYAN',' ccnx libpath : %s'%self.env['LIBPATH_%s'%var])
diff --git a/waf-tools/protobuf.py b/waf-tools/protobuf.py
new file mode 100644
index 0000000..684a945
--- /dev/null
+++ b/waf-tools/protobuf.py
@@ -0,0 +1,72 @@
+# -*- Mode: python; py-indent-offset: 4; indent-tabs-mode: nil; coding: utf-8; -*-
+
+#! /usr/bin/env python
+# encoding: utf-8
+
+'''
+
+When using this tool, the wscript will look like:
+
+def options(opt):
+ opt.tool_options("protobuf", tooldir=["waf-tools"])
+
+def configure(conf):
+ conf.load("compiler_cxx protobuf")
+
+def build(bld):
+ bld(source="main.cpp", target="app", use="PROTOBUF")
+
+Options are generated, in order to specify the location of protobuf includes/libraries.
+
+'''
+
+from waflib import Utils, TaskGen, Task, Logs
+
+from waflib import Utils,Logs,Errors
+from waflib.Configure import conf
+
+def options(opt):
+ pass
+
+def configure(conf):
+ """
+ """
+ conf.check_cfg(package='protobuf', args=['--cflags', '--libs'], uselib_store='PROTOBUF', mandatory=True)
+
+ conf.find_program ('protoc', var='PROTOC', path_list = conf.env['PATH'], mandatory = True)
+
+@TaskGen.extension('.proto')
+def add_proto(self, node):
+ """
+ Compile PROTOBUF protocol specifications
+ """
+ prototask = self.create_task ("protobuf", node, node.change_ext (".pb"))
+ try:
+ self.compiled_tasks.append (prototask)
+ except AttributeError:
+ self.compiled_tasks = [prototask]
+
+class protobuf(Task.Task):
+ """
+ Task for compiling PROTOBUF protocol specifications
+ """
+ run_str = '${PROTOC} --cpp_out ${TGT[0].parent.abspath()} --proto_path ${SRC[0].parent.abspath()} ${SRC[0].abspath()} -o${TGT[0].abspath()}'
+ color = 'BLUE'
+
+@TaskGen.feature('cxxshlib')
+@TaskGen.before_method('process_source')
+def dynamic_post(self):
+ if not getattr(self, 'dynamic_source', None):
+ return
+ self.source = Utils.to_list(self.source)
+
+ src = self.bld.path.get_bld().ant_glob (Utils.to_list(self.dynamic_source))
+
+ for cc in src:
+ # Signature for the source
+ cc.sig = Utils.h_file (cc.abspath())
+ # Signature for the header
+ h = cc.change_ext (".h")
+ h.sig = Utils.h_file (h.abspath())
+
+ self.source.extend (src)
diff --git a/wscript b/wscript
index 8300acc..fecf6f4 100644
--- a/wscript
+++ b/wscript
@@ -2,31 +2,37 @@
VERSION='0.1'
APPNAME='chronoshare'
+CCNXLIB='ccnx'
from waflib import Build, Logs
def options(opt):
- opt.load('compiler_cxx boost ccnx')
opt.add_option('--debug',action='store_true',default=False,dest='debug',help='''debugging mode''')
+ opt.add_option('--test', action='store_true',default=False,dest='_test',help='''build unit tests''')
opt.add_option('--yes',action='store_true',default=False) # for autoconf/automake/make compatibility
+ opt.load('compiler_c')
+ opt.load('compiler_cxx')
+ opt.load('boost')
+ opt.load('gnu_dirs')
+ opt.load('ccnx protobuf', tooldir=["waf-tools"])
def configure(conf):
- conf.load("compiler_cxx boost ccnx")
+ conf.load("compiler_cxx")
+ conf.load('gnu_dirs')
conf.check_cfg(package='sqlite3', args=['--cflags', '--libs'], uselib_store='SQLITE3', mandatory=True)
if not conf.check_cfg(package='openssl', args=['--cflags', '--libs'], uselib_store='SSL', mandatory=False):
- Logs.error ("bla")
- conf.check_cc(lib='crypto',
- header_name='openssl/crypto.h',
- define_name='HAVE_SSL',
- uselib_store='SSL')
- else:
- conf.define ("HAVE_SSL", 1)
-
+ libcrypto = conf.check_cc(lib='crypto',
+ header_name='openssl/crypto.h',
+ define_name='HAVE_SSL',
+ uselib_store='SSL')
if not conf.get_define ("HAVE_SSL"):
conf.fatal ("Cannot find SSL libraries")
+ conf.load ('ccnx')
+ conf.load('boost')
+
conf.check_boost(lib='system iostreams regex')
boost_version = conf.env.BOOST_VERSION.split('_')
@@ -42,9 +48,53 @@
else:
conf.env.append_value('CXXFLAGS', ['-O3', '-g'])
+ if conf.options._test:
+ conf.define('_TEST', 1)
+
+ conf.load('protobuf')
+
conf.write_config_header('src/config.h')
def build (bld):
+ bld.post_mode = Build.POST_LAZY
+
+ bld.add_group ("protobuf")
+
+# x = bld (
+# features = ["protobuf"],
+# source = ["model/sync-state.proto"],
+# target = ["model/sync-state.pb"],
+# )
+
+ bld.add_group ("code")
+
+ libccnx = bld (
+ target=CCNXLIB,
+ features=['cxx', 'cxxshlib'],
+ source = [
+ 'src/ccnx-wrapper.cpp',
+ 'src/ccnx-pco.cpp',
+ 'src/ccnx-closure.cpp',
+ 'src/ccnx-tunnel.cpp',
+ ],
+ use = 'BOOST BOOST_THREAD SSL CCNX',
+ includes = ['include', ],
+ )
+
+ # Unit tests
+ if bld.get_define("_TEST"):
+ unittests = bld.program (
+ target="unit-tests",
+ source = bld.path.ant_glob(['test/**/*.cc']),
+ features=['cxx', 'cxxprogram'],
+ use = 'BOOST_TEST',
+ includes = ['include', ],
+ )
+
+
+
+
+
chronoshare = bld (
target=APPNAME,
features=['cxx', 'cxxprogram'],