Adapt sync to use new ndn.cxx api
diff --git a/src/ccnx/sync-app-socket-c.cc b/src/ccnx/sync-app-socket-c.cc
deleted file mode 100644
index e23c8d1..0000000
--- a/src/ccnx/sync-app-socket-c.cc
+++ /dev/null
@@ -1,146 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#include "sync-app-socket.h"
-#include <boost/shared_array.hpp>
-using namespace std;
-using namespace Sync;
-
-struct SyncAppSocketStruct;
-struct MissingDataInfoC
-{
- const char *prefix;
- int session;
- int low;
- int high;
-};
-
-class CallbackHolder {
-private:
- void (*m_callback)(const char *, const char *);
-
-public:
- CallbackHolder(void (*callback)(const char*, const char*))
- :m_callback(callback){};
-
- void callbackWrapper(const string &name, const string &data) {
- if (m_callback != NULL)
- (*m_callback)(name.c_str(), data.c_str());
- }
-};
-
-class UpdateCallbackHolder {
-private:
- void (*m_callback)(const struct MissingDataInfoC*, const int, const SyncAppSocketStruct*);
-
-public:
- UpdateCallbackHolder(void (*callback)(
- const MissingDataInfoC*,
- const int,
- const SyncAppSocketStruct*))
- :m_callback(callback){};
-
- void callbackWrapper(const vector<MissingDataInfo> &mdi, SyncAppSocket* socket) {
- boost::shared_array<MissingDataInfoC> mdic(new MissingDataInfoC[mdi.size()]);
- int i;
-
- for (i = 0; i < mdi.size(); i++)
- {
- mdic[i].prefix = mdi[i].prefix.c_str();
- mdic[i].session = mdi[i].high.getSession();
- if (mdi[i].low.getSession() != mdi[i].high.getSession())
- mdic[i].low = 0;
- else
- mdic[i].low = mdi[i].low.getSeq();
- mdic[i].high = mdi[i].high.getSeq();
- }
- if (m_callback != NULL)
- (*m_callback)(mdic.get(), i, (const SyncAppSocketStruct*) socket);
- }
-};
-
-class RemoveCallbackHolder {
-private:
- void (*m_callback)(const char*);
-
-public:
- RemoveCallbackHolder(void (*callback)(const char*)):m_callback(callback){};
- void callbackWrapper(const string &prefix) {
- if (m_callback != NULL)
- (*m_callback)(prefix.c_str());
- }
-};
-
-extern "C"
-SyncAppSocketStruct *
-create_sync_app_socket(const char *prefix,
- void (*updatecallback)(const MissingDataInfoC*, const int, const SyncAppSocketStruct*),
- void (*removecallback)(const char *))
-{
- boost::shared_ptr<UpdateCallbackHolder> uh(new UpdateCallbackHolder(updatecallback));
- boost::shared_ptr<RemoveCallbackHolder> rh(new RemoveCallbackHolder(removecallback));
- boost::function<void (const vector<MissingDataInfo>&, SyncAppSocket*)> ucb = bind(&UpdateCallbackHolder::callbackWrapper, uh, _1, _2);
- boost::function<void (const string&)> rcb = bind(&RemoveCallbackHolder::callbackWrapper, rh, _1);
- SyncAppSocket *sock = new SyncAppSocket(prefix, ucb, rcb);
- return (SyncAppSocketStruct *) sock;
-}
-
-extern "C"
-void
-delete_sync_app_socket(SyncAppSocketStruct **sock) {
- SyncAppSocket *temp = *((SyncAppSocket **)sock);
- delete temp;
- temp = NULL;
-}
-
-// assume char *buf ends with '\0', or otherwise it's going to crash;
-// should fix this "feature"
-extern "C"
-int
-sync_app_socket_publish(const SyncAppSocketStruct *sock, const char *prefix, int session, const char *buf, int freshness)
-{
- SyncAppSocket *temp = (SyncAppSocket*) sock;
- return temp->publishString(prefix, session, buf, freshness);
-}
-
-extern "C"
-void
-sync_app_socket_remove(const SyncAppSocketStruct *sock, const char *prefix)
-{
- SyncAppSocket *temp = (SyncAppSocket*) sock;
- temp->remove(prefix);
-}
-
-extern "C"
-void
-sync_app_socket_fetch(const SyncAppSocketStruct *sock, const char *prefix, int session, int sequence,
- void (*callback)(const char*, const char*), int retry)
-{
- SyncAppSocket *temp = (SyncAppSocket*) sock;
- string s(prefix);
- SeqNo seq(session, sequence);
- boost::shared_ptr<CallbackHolder> h(new CallbackHolder(callback));
- boost::function<void (const string&, const string&)> cb = bind(&CallbackHolder::callbackWrapper, h, _1, _2);
-
- temp->fetchString(s, seq, cb, retry);
-}
-
diff --git a/src/ccnx/sync-app-socket-c.h b/src/ccnx/sync-app-socket-c.h
deleted file mode 100644
index 7ab9e70..0000000
--- a/src/ccnx/sync-app-socket-c.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#ifndef SYNC_APP_SOCKET_C_H
-#define SYNC_APP_SOCKET_C_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
- typedef struct SyncAppSocketStruct SyncAppSocketStruct;
-
- struct MissingDataInfoC
- {
- const char *prefix;
- int session;
- int low;
- int high;
- };
-
- SyncAppSocketStruct *create_sync_app_socket(const char *prefix,
- void (*updatecallback)(const struct MissingDataInfoC*, const int, const SyncAppSocketStruct*),
- void (*removecallback)(const char *));
- void delete_sync_app_socket(SyncAppSocketStruct **sock);
- int sync_app_socket_publish(const SyncAppSocketStruct *sock, const char *prefix, int session, const char *buf, int freshness);
- void sync_app_socket_remove(const SyncAppSocketStruct *sock, const char *prefix);
- void sync_app_socket_fetch(const SyncAppSocketStruct *sock, const char *prefix, int session, int seq,
- void (*callback)(const char*, const char*), int retry);
-#ifdef __cplusplus
-}
-#endif
-
-#endif // SYNC_APP_SOCKET_C_H
diff --git a/src/ccnx/sync-app-socket.cc b/src/ccnx/sync-app-socket.cc
deleted file mode 100644
index 0e4e829..0000000
--- a/src/ccnx/sync-app-socket.cc
+++ /dev/null
@@ -1,115 +0,0 @@
-/* -*- Mode: C32++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#include "sync-app-socket.h"
-
-using namespace std;
-using namespace boost;
-
-namespace Sync
-{
-
-SyncAppSocket::SyncAppSocket (const string &syncPrefix, NewDataCallback dataCallback, RemoveCallback rmCallback )
- : m_newDataCallback(dataCallback)
- , m_ccnxHandle (new CcnxWrapper())
- , m_syncLogic (syncPrefix,
- bind(&SyncAppSocket::passCallback, this, _1),
- rmCallback)
-{
-}
-
-SyncAppSocket::~SyncAppSocket()
-{
-}
-
-std::string
-SyncAppSocket::GetLocalPrefix()
-{
- // this handle is supposed to be short lived
- CcnxWrapperPtr handle( new CcnxWrapper());
- return handle->getLocalPrefix();
-}
-
-bool
-SyncAppSocket::publishString (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
-{
- uint32_t sequence = getNextSeq(prefix, session);
- ostringstream contentNameWithSeqno;
- contentNameWithSeqno << prefix << "/" << session << "/" << sequence;
- m_ccnxHandle->publishStringData (contentNameWithSeqno.str (), dataBuffer, freshness);
-
- SeqNo s(session, sequence + 1);
- m_sequenceLog[prefix] = s;
-
- m_syncLogic.addLocalNames (prefix, session, sequence);
- return true;
-}
-
-bool
-SyncAppSocket::publishRaw(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness)
-{
- uint32_t sequence = getNextSeq(prefix, session);
- ostringstream contentNameWithSeqno;
- contentNameWithSeqno << prefix << "/" << session << "/" << sequence;
-
- m_ccnxHandle->publishRawData (contentNameWithSeqno.str (), buf, len, freshness);
-
- SeqNo s(session, sequence + 1);
- m_sequenceLog[prefix] = s;
- m_syncLogic.addLocalNames (prefix, session, sequence);
- return true;
-}
-
-
-void
-SyncAppSocket::fetchString(const std::string &prefix, const SeqNo &seq, CcnxWrapper::StringDataCallback callback, int retry)
-{
- ostringstream interestName;
- interestName << prefix << "/" << seq.getSession() << "/" << seq.getSeq();
- m_ccnxHandle->sendInterestForString(interestName.str(), callback, retry);
-}
-
-void
-SyncAppSocket::fetchRaw(const std::string &prefix, const SeqNo &seq, CcnxWrapper::RawDataCallback callback, int retry)
-{
- ostringstream interestName;
- interestName << prefix << "/" << seq.getSession() << "/" << seq.getSeq();
- //std::cout << "Socket " << this << " Send Interest <" << interestName.str() << "> for raw data " << endl;
- m_ccnxHandle->sendInterest(interestName.str(), callback, retry);
-}
-
-uint32_t
-SyncAppSocket::getNextSeq (const string &prefix, uint32_t session)
-{
- SequenceLog::iterator i = m_sequenceLog.find (prefix);
-
- if (i != m_sequenceLog.end ())
- {
- SeqNo s = i->second;
- if (s.getSession() == session)
- return s.getSeq();
- }
- return 0;
-}
-
-}
-
diff --git a/src/ccnx/sync-app-socket.h b/src/ccnx/sync-app-socket.h
deleted file mode 100644
index ceeb5ce..0000000
--- a/src/ccnx/sync-app-socket.h
+++ /dev/null
@@ -1,112 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#ifndef SYNC_APP_SOCKET_H
-#define SYNC_APP_SOCKET_H
-
-#include "sync-logic.h"
-#include <boost/function.hpp>
-#include <boost/unordered_map.hpp>
-#include "sync-seq-no.h"
-#include "sync-ccnx-wrapper.h"
-#include <utility>
-#include <map>
-#include <vector>
-#include <sstream>
-
-namespace Sync {
-
-/**
- * \ingroup sync
- * @brief A simple interface to interact with client code
- */
-class SyncAppSocket
-{
-public:
- typedef boost::function< void (const std::vector<MissingDataInfo> &, SyncAppSocket * ) > NewDataCallback;
- typedef boost::function< void ( const std::string &/*prefix*/ ) > RemoveCallback;
- /**
- * @brief the constructor for SyncAppSocket; the parameter syncPrefix
- * should be passed to the constructor of m_syncAppWrapper; the other
- * parameter should be passed to the constructor of m_fetcher; furthermore,
- * the fetch function of m_fetcher should be a second paramter passed to
- * the constructor of m_syncAppWrapper, so that m_syncAppWrapper can tell
- * m_fetcher to fetch the actual app data after it learns the names
- *
- * @param syncPrefix the name prefix for Sync Interest
- * @param dataCallback the callback to process data
- */
- SyncAppSocket (const std::string &syncPrefix, NewDataCallback dataCallback, RemoveCallback rmCallback);
- ~SyncAppSocket ();
-
- /**
- * @brief publish data from local client and tell SyncLogic to update
- * the sync tree by adding the local names
- *
- * @param prefix the name prefix for the data
- * @param session session to which data is published
- * @param dataBuffer the data itself
- * @param freshness the freshness time for the data (in seconds)
- */
- bool publishString (const std::string &prefix, uint32_t session, const std::string &dataBuffer, int freshness);
-
- bool publishRaw(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness);
-
- /**
- * @brief delete a participant's subtree from the sync tree; SyncLogic will do the work
- * this is just a wrapper
- *
- * @param prefix the prefix for the participant
- */
- void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
-
- void fetchString(const std::string &prefix, const SeqNo &seq, CcnxWrapper::StringDataCallback callback, int retry = 0);
- void fetchRaw(const std::string &prefix, const SeqNo &seq, CcnxWrapper::RawDataCallback callback, int retry = 0);
-
- // for sync-demo
- std::string getRootDigest() {return m_syncLogic.getRootDigest();}
- uint32_t
- getNextSeq (const std::string &prefix, uint32_t session);
-
- SyncLogic &
- getLogic () { return m_syncLogic; }
-
- // make this a static function so we don't have to create socket instance without
- // knowing the local prefix. it's a wrong place for this function anyway
- static std::string
- GetLocalPrefix ();
-
-private:
- void
- passCallback(const std::vector<MissingDataInfo> &v) {m_newDataCallback(v, this);}
-
-private:
- typedef boost::unordered_map<std::string, SeqNo> SequenceLog;
- NewDataCallback m_newDataCallback;
- SequenceLog m_sequenceLog;
- CcnxWrapperPtr m_ccnxHandle;
- SyncLogic m_syncLogic;
-};
-
-} // Sync
-
-#endif // SYNC_APP_SOCKET_H
diff --git a/src/ccnx/sync-ccnx-wrapper.cc b/src/ccnx/sync-ccnx-wrapper.cc
deleted file mode 100644
index c824e72..0000000
--- a/src/ccnx/sync-ccnx-wrapper.cc
+++ /dev/null
@@ -1,597 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#include "sync-ccnx-wrapper.h"
-
-extern "C" {
-#include <ccn/fetch.h>
-}
-
-#include "sync-logging.h"
-#include <poll.h>
-#include <boost/throw_exception.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/random.hpp>
-
-#include "sync-scheduler.h"
-
-typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
-typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
-
-
-using namespace std;
-using namespace boost;
-
-INIT_LOGGER ("CcnxWrapper");
-
-namespace Sync {
-
-#ifdef _DEBUG_WRAPPER_
-CcnxWrapper::CcnxWrapper(char c)
-#else
-CcnxWrapper::CcnxWrapper()
-#endif
- : m_handle (0)
- , m_keyStore (0)
- , m_keyLoactor (0)
- , m_running (true)
- , m_connected (false)
-{
-#ifdef _DEBUG_WRAPPER_
- m_c = c;
-#endif
- connectCcnd();
- initKeyStore ();
- createKeyLocator ();
- m_thread = thread (&CcnxWrapper::ccnLoop, this);
-}
-
-void
-CcnxWrapper::connectCcnd()
-{
- recursive_mutex::scoped_lock lock (m_mutex);
-
- if (m_handle != 0) {
- ccn_disconnect (m_handle);
- ccn_destroy (&m_handle);
- }
-
- m_handle = ccn_create ();
- _LOG_DEBUG("<<< connecting to ccnd");
- if (ccn_connect(m_handle, NULL) < 0)
- {
- _LOG_DEBUG("<<< connecting to ccnd failed");
- BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
- }
- m_connected = true;
-
- if (!m_registeredInterests.empty())
- {
- for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
- {
- // clearInterestFilter(it->first);
- setInterestFilter(it->first, it->second);
- _LOG_DEBUG("<<< registering interest filter for: " << it->first);
- }
- }
-}
-
-CcnxWrapper::~CcnxWrapper()
-{
- // std::cout << "CcnxWrapper::~CcnxWrapper()" << std::endl;
- {
- recursive_mutex::scoped_lock lock(m_mutex);
- m_running = false;
- }
-
- m_thread.join ();
- ccn_disconnect (m_handle);
- ccn_destroy (&m_handle);
- ccn_charbuf_destroy (&m_keyLoactor);
- ccn_keystore_destroy (&m_keyStore);
-}
-
-/// @cond include_hidden
-
-void
-CcnxWrapper::createKeyLocator ()
-{
- m_keyLoactor = ccn_charbuf_create();
- ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
- ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
- int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
- if (res >= 0)
- {
- ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
- ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
- }
-}
-
-const ccn_pkey*
-CcnxWrapper::getPrivateKey ()
-{
- return ccn_keystore_private_key (m_keyStore);
-}
-
-const unsigned char*
-CcnxWrapper::getPublicKeyDigest ()
-{
- return ccn_keystore_public_key_digest(m_keyStore);
-}
-
-ssize_t
-CcnxWrapper::getPublicKeyDigestLength ()
-{
- return ccn_keystore_public_key_digest_length(m_keyStore);
-}
-
-void
-CcnxWrapper::initKeyStore ()
-{
- m_keyStore = ccn_keystore_create ();
- string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
- if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
- BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
-}
-
-void
-CcnxWrapper::ccnLoop ()
-{
- _LOG_FUNCTION (this);
- static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
- static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
-
- while (m_running)
- {
- try
- {
-#ifdef _DEBUG_WRAPPER_
- std::cout << m_c << flush;
-#endif
- int res = 0;
- {
- recursive_mutex::scoped_lock lock (m_mutex);
- res = ccn_run (m_handle, 0);
- }
-
- if (!m_running) break;
-
- if (res < 0)
- BOOST_THROW_EXCEPTION (CcnxOperationException()
- << errmsg_info_str("ccn_run returned error"));
-
-
- pollfd pfds[1];
- {
- recursive_mutex::scoped_lock lock (m_mutex);
-
- pfds[0].fd = ccn_get_connection_fd (m_handle);
- pfds[0].events = POLLIN;
- if (ccn_output_is_pending (m_handle))
- pfds[0].events |= POLLOUT;
- }
-
- int ret = poll (pfds, 1, 1);
- if (ret < 0)
- {
- BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
- }
- }
- catch (CcnxOperationException &e)
- {
- // do not try reconnect for now
- throw e;
- /*
- m_connected = false;
- // probably ccnd has been stopped
- // try reconnect with sleep
- int interval = 1;
- int maxInterval = 32;
- while (m_running)
- {
- try
- {
- this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
-
- connectCcnd();
- _LOG_DEBUG("reconnect to ccnd succeeded");
- break;
- }
- catch (CcnxOperationException &e)
- {
- this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
-
- // do exponential backup for reconnect interval
- if (interval < maxInterval)
- {
- interval *= 2;
- }
- }
- }
- */
- }
- catch (const std::exception &exc)
- {
- // catch anything thrown within try block that derives from std::exception
- std::cerr << exc.what();
- }
- catch (...)
- {
- cout << "UNKNOWN EXCEPTION !!!" << endl;
- }
-
- }
-}
-
-/// @endcond
-int
-CcnxWrapper::publishStringData (const string &name, const string &dataBuffer, int freshness) {
- return publishRawData(name, dataBuffer.c_str(), dataBuffer.length(), freshness);
-}
-
-int
-CcnxWrapper::publishRawData (const string &name, const char *buf, size_t len, int freshness)
-{
-
- recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running || !m_connected)
- return -1;
-
- // cout << "Publish: " << name << endl;
- ccn_charbuf *pname = ccn_charbuf_create();
- ccn_charbuf *signed_info = ccn_charbuf_create();
- ccn_charbuf *content = ccn_charbuf_create();
-
- ccn_name_from_uri(pname, name.c_str());
- ccn_signed_info_create(signed_info,
- getPublicKeyDigest(),
- getPublicKeyDigestLength(),
- NULL,
- CCN_CONTENT_DATA,
- freshness,
- NULL,
- m_keyLoactor);
- if(ccn_encode_ContentObject(content, pname, signed_info,
- (const unsigned char *)buf, len,
- NULL, getPrivateKey()) < 0)
- {
- // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
- _LOG_ERROR("<<< Encode content failed " << name);
- }
-
- if (ccn_put(m_handle, content->buf, content->length) < 0)
- {
- // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
- _LOG_ERROR("<<< ccnput content failed " << name);
- }
-
- ccn_charbuf_destroy (&pname);
- ccn_charbuf_destroy (&signed_info);
- ccn_charbuf_destroy (&content);
- return 0;
-}
-
-
-static ccn_upcall_res
-incomingInterest(ccn_closure *selfp,
- ccn_upcall_kind kind,
- ccn_upcall_info *info)
-{
- CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
-
- switch (kind)
- {
- case CCN_UPCALL_FINAL: // effective in unit tests
- delete f;
- delete selfp;
- return CCN_UPCALL_RESULT_OK;
-
- case CCN_UPCALL_INTEREST:
- break;
-
- default:
- return CCN_UPCALL_RESULT_OK;
- }
-
- string interest;
- for (int i = 0; i < info->interest_comps->n - 1; i++)
- {
- char *comp;
- size_t size;
- interest += "/";
- ccn_name_comp_get(info->interest_ccnb, info->interest_comps, i, (const unsigned char **)&comp, &size);
- string compStr(comp, size);
- interest += compStr;
- }
- (*f) (interest);
- _LOG_DEBUG("<<< processed interest: " << interest);
- return CCN_UPCALL_RESULT_OK;
-}
-
-static ccn_upcall_res
-incomingData(ccn_closure *selfp,
- ccn_upcall_kind kind,
- ccn_upcall_info *info)
-{
- ClosurePass *cp = static_cast<ClosurePass *> (selfp->data);
-
- switch (kind)
- {
- case CCN_UPCALL_FINAL: // effecitve in unit tests
- delete cp;
- cp = NULL;
- delete selfp;
- return CCN_UPCALL_RESULT_OK;
-
- case CCN_UPCALL_CONTENT:
- break;
-
- case CCN_UPCALL_INTEREST_TIMED_OUT: {
- if (cp != NULL && cp->getRetry() > 0) {
- cp->decRetry();
- return CCN_UPCALL_RESULT_REEXPRESS;
- }
- return CCN_UPCALL_RESULT_OK;
- }
-
- default:
- return CCN_UPCALL_RESULT_OK;
- }
-
- char *pcontent;
- size_t len;
- if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, (const unsigned char **)&pcontent, &len) < 0)
- {
- // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
- _LOG_ERROR("<<< Decode content failed ");
- }
-
- string name;
- for (int i = 0; i < info->content_comps->n - 1; i++)
- {
- char *comp;
- size_t size;
- name += "/";
- ccn_name_comp_get(info->content_ccnb, info->content_comps, i, (const unsigned char **)&comp, &size);
- string compStr(comp, size);
- name += compStr;
- }
-
- cp->runCallback(name, pcontent, len);
-
- return CCN_UPCALL_RESULT_OK;
-}
-
-int CcnxWrapper::sendInterestForString (const string &strInterest, const StringDataCallback &strDataCallback, int retry)
-{
- DataClosurePass * pass = new DataClosurePass(STRING_FORM, retry, strDataCallback);
- return sendInterest(strInterest, pass);
-}
-
-int CcnxWrapper::sendInterest (const string &strInterest, const RawDataCallback &rawDataCallback, int retry)
-{
- RawDataClosurePass * pass = new RawDataClosurePass(RAW_DATA, retry, rawDataCallback);
- return sendInterest(strInterest, pass);
-}
-
-int CcnxWrapper::sendInterest (const string &strInterest, void *dataPass)
-{
- recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running || !m_connected)
- return -1;
-
- // std::cout << "Send interests for " << strInterest << std::endl;
- ccn_charbuf *pname = ccn_charbuf_create();
- ccn_closure *dataClosure = new ccn_closure;
-
- ccn_name_from_uri (pname, strInterest.c_str());
- dataClosure->data = dataPass;
-
- dataClosure->p = &incomingData;
- if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
- {
- // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("express interest failed"));
- _LOG_ERROR("<<< Express interest failed: " << strInterest);
- }
-
- _LOG_DEBUG("<<< Sending interest: " << strInterest);
-
- ccn_charbuf_destroy (&pname);
- return 0;
-}
-
-int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
-{
- recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running || !m_connected)
- return -1;
-
- ccn_charbuf *pname = ccn_charbuf_create();
- ccn_closure *interestClosure = new ccn_closure;
-
- ccn_name_from_uri (pname, prefix.c_str());
- interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
- interestClosure->p = &incomingInterest;
- int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
- if (ret < 0)
- {
- BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
- }
-
- m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
- ccn_charbuf_destroy(&pname);
-
- return 0;
-}
-
-void
-CcnxWrapper::clearInterestFilter (const std::string &prefix)
-{
- recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running || !m_connected)
- return;
-
- std::cout << "clearInterestFilter" << std::endl;
- ccn_charbuf *pname = ccn_charbuf_create();
-
- ccn_name_from_uri (pname, prefix.c_str());
- int ret = ccn_set_interest_filter (m_handle, pname, 0);
- if (ret < 0)
- {
- BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
- }
-
- m_registeredInterests.erase(prefix);
- ccn_charbuf_destroy(&pname);
-}
-
-string
-CcnxWrapper::getLocalPrefix ()
-{
- struct ccn * tmp_handle = ccn_create ();
- int res = ccn_connect (tmp_handle, NULL);
- if (res < 0)
- {
- _LOG_ERROR ("connecting to ccnd failed");
- return "";
- }
-
- string retval = "";
-
- struct ccn_charbuf *templ = ccn_charbuf_create();
- ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
- ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
- ccn_charbuf_append_closer(templ); /* </Name> */
- // XXX - use pubid if possible
- ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
- ccnb_append_number(templ, 1);
- ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
- ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
- ccn_charbuf_append_closer(templ); /* </Interest> */
-
- struct ccn_charbuf *name = ccn_charbuf_create ();
- res = ccn_name_from_uri (name, "/local/ndn/prefix");
- if (res < 0) {
- _LOG_ERROR ("Something wrong with `ccn_name_from_uri` call");
- }
- else
- {
- struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
-
- struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
- NULL, 4, CCN_V_HIGHEST, 0);
- if (stream == NULL) {
- _LOG_ERROR ("Cannot create ccn_fetch_stream");
- }
- else
- {
- ostringstream os;
-
- int counter = 0;
- char buf[256];
- while (true) {
- res = ccn_fetch_read (stream, buf, sizeof(buf));
-
- if (res == 0) {
- break;
- }
-
- if (res > 0) {
- os << string(buf, res);
- } else if (res == CCN_FETCH_READ_NONE) {
- if (counter < 2)
- {
- ccn_run(tmp_handle, 1000);
- counter ++;
- }
- else
- {
- break;
- }
- } else if (res == CCN_FETCH_READ_END) {
- break;
- } else if (res == CCN_FETCH_READ_TIMEOUT) {
- break;
- } else {
- _LOG_ERROR ("fatal error. should be reported");
- break;
- }
- }
- retval = os.str ();
- stream = ccn_fetch_close(stream);
- }
- fetch = ccn_fetch_destroy(fetch);
- }
-
- ccn_charbuf_destroy (&name);
-
- ccn_disconnect (tmp_handle);
- ccn_destroy (&tmp_handle);
-
- return retval;
-}
-
-
-
-
-
-DataClosurePass::DataClosurePass (CallbackType type, int retry, const CcnxWrapper::StringDataCallback &strDataCallback): ClosurePass(type, retry), m_callback(NULL)
-{
- m_callback = new CcnxWrapper::StringDataCallback (strDataCallback);
-}
-
-DataClosurePass::~DataClosurePass ()
-{
- delete m_callback;
- m_callback = NULL;
-}
-
-void
-DataClosurePass::runCallback(std::string name, const char *data, size_t len)
-{
- string content(data, len);
- if (m_callback != NULL) {
- (*m_callback)(name, content);
- }
-}
-
-
-RawDataClosurePass::RawDataClosurePass (CallbackType type, int retry, const CcnxWrapper::RawDataCallback &rawDataCallback): ClosurePass(type, retry), m_callback(NULL)
-{
- m_callback = new CcnxWrapper::RawDataCallback (rawDataCallback);
-}
-
-RawDataClosurePass::~RawDataClosurePass ()
-{
- delete m_callback;
- m_callback = NULL;
-}
-
-void
-RawDataClosurePass::runCallback(std::string name, const char *data, size_t len)
-{
- if (m_callback != NULL) {
- (*m_callback)(name, data, len);
- }
-}
-
-}
diff --git a/src/ccnx/sync-ccnx-wrapper.h b/src/ccnx/sync-ccnx-wrapper.h
deleted file mode 100644
index f0f8d8c..0000000
--- a/src/ccnx/sync-ccnx-wrapper.h
+++ /dev/null
@@ -1,198 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#ifndef SYNC_CCNX_WRAPPER_H
-#define SYNC_CCNX_WRAPPER_H
-
-extern "C" {
-#include <ccn/ccn.h>
-#include <ccn/charbuf.h>
-#include <ccn/keystore.h>
-#include <ccn/uri.h>
-#include <ccn/bloom.h>
-#include <ccn/signing.h>
-}
-
-#include <boost/exception/all.hpp>
-#include <boost/thread/recursive_mutex.hpp>
-#include <boost/thread/thread.hpp>
-#include <boost/function.hpp>
-#include <string>
-#include <sstream>
-#include <map>
-
-/**
- * \defgroup sync SYNC protocol
- *
- * Implementation of SYNC protocol
- */
-namespace Sync {
-
-struct CcnxOperationException : virtual boost::exception, virtual std::exception { };
-/**
- * \ingroup sync
- * @brief A wrapper for ccnx library; clients of this code do not need to deal
- * with ccnx library
- */
-class CcnxWrapper {
-public:
- typedef boost::function<void (std::string, std::string)> StringDataCallback;
- typedef boost::function<void (std::string, const char *buf, size_t len)> RawDataCallback;
- typedef boost::function<void (std::string)> InterestCallback;
-
-public:
-
-#ifdef _DEBUG_WRAPPER_
- CcnxWrapper(char c='.');
- char m_c;
-#else
- CcnxWrapper();
-#endif
-
- ~CcnxWrapper();
-
- /**
- * @brief send Interest; need to grab lock m_mutex first
- *
- * @param strInterest the Interest name
- * @param dataCallback the callback function to deal with the returned data
- * @return the return code of ccn_express_interest
- */
- int
- sendInterestForString (const std::string &strInterest, const StringDataCallback &strDataCallback, int retry = 0);
-
- int
- sendInterest (const std::string &strInterest, const RawDataCallback &rawDataCallback, int retry = 0);
-
- /**
- * @brief set Interest filter (specify what interest you want to receive)
- *
- * @param prefix the prefix of Interest
- * @param interestCallback the callback function to deal with the returned data
- * @return the return code of ccn_set_interest_filter
- */
- int
- setInterestFilter (const std::string &prefix, const InterestCallback &interestCallback);
-
- /**
- * @brief clear Interest filter
- * @param prefix the prefix of Interest
- */
- void
- clearInterestFilter (const std::string &prefix);
-
- /**
- * @brief publish data and put it to local ccn content store; need to grab
- * lock m_mutex first
- *
- * @param name the name for the data object
- * @param dataBuffer the data to be published
- * @param freshness the freshness time for the data object
- * @return code generated by ccnx library calls, >0 if success
- */
- int
- publishStringData (const std::string &name, const std::string &dataBuffer, int freshness);
-
- int
- publishRawData (const std::string &name, const char *buf, size_t len, int freshness);
-
- std::string
- getLocalPrefix ();
-
-protected:
- void
- connectCcnd();
-
- /// @cond include_hidden
- void
- createKeyLocator ();
-
- void
- initKeyStore ();
-
- const ccn_pkey *
- getPrivateKey ();
-
- const unsigned char *
- getPublicKeyDigest ();
-
- ssize_t
- getPublicKeyDigestLength ();
-
- void
- ccnLoop ();
-
- int
- sendInterest (const std::string &strInterest, void *dataPass);
- /// @endcond
-protected:
- ccn* m_handle;
- ccn_keystore *m_keyStore;
- ccn_charbuf *m_keyLoactor;
- // to lock, use "boost::recursive_mutex::scoped_lock scoped_lock(mutex);
- boost::recursive_mutex m_mutex;
- boost::thread m_thread;
- bool m_running;
- bool m_connected;
- std::map<std::string, InterestCallback> m_registeredInterests;
- // std::list< std::pair<std::string, InterestCallback> > m_registeredInterests;
-};
-
-typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;
-
-enum CallbackType { STRING_FORM, RAW_DATA};
-
-class ClosurePass {
-public:
- ClosurePass(CallbackType type, int retry): m_retry(retry), m_type(type) {}
- int getRetry() {return m_retry;}
- void decRetry() { m_retry--;}
- CallbackType getCallbackType() {return m_type;}
- virtual ~ClosurePass(){}
- virtual void runCallback(std::string name, const char *data, size_t len) = 0;
-
-protected:
- int m_retry;
- CallbackType m_type;
-};
-
-class DataClosurePass: public ClosurePass {
-public:
- DataClosurePass(CallbackType type, int retry, const CcnxWrapper::StringDataCallback &strDataCallback);
- virtual ~DataClosurePass();
- virtual void runCallback(std::string name, const char *, size_t len);
-private:
- CcnxWrapper::StringDataCallback * m_callback;
-};
-
-class RawDataClosurePass: public ClosurePass {
-public:
- RawDataClosurePass(CallbackType type, int retry, const CcnxWrapper::RawDataCallback &RawDataCallback);
- virtual ~RawDataClosurePass();
- virtual void runCallback(std::string name, const char *, size_t len);
-private:
- CcnxWrapper::RawDataCallback * m_callback;
-};
-
-} // Sync
-
-#endif // SYNC_CCNX_WRAPPER_H
diff --git a/src/ccnx/sync-socket.cc b/src/ccnx/sync-socket.cc
new file mode 100644
index 0000000..48d4ad0
--- /dev/null
+++ b/src/ccnx/sync-socket.cc
@@ -0,0 +1,126 @@
+/* -*- Mode: C32++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "sync-socket.h"
+
+using namespace std;
+using namespace ndn;
+
+namespace Sync {
+
+SyncSocket::SyncSocket (const string &syncPrefix,
+ Ptr<SyncPolicyManager> syncPolicyManager,
+ NewDataCallback dataCallback,
+ RemoveCallback rmCallback )
+ : m_newDataCallback(dataCallback)
+ , m_syncPolicyManager(syncPolicyManager)
+ , m_syncLogic (syncPrefix,
+ syncPolicyManager,
+ bind(&SyncSocket::passCallback, this, _1),
+ rmCallback)
+{
+ Ptr<security::Keychain> keychain = Ptr<security::Keychain>(new security::Keychain(Ptr<security::IdentityManager>::Create(), m_syncPolicyManager, NULL));
+ m_handler = Ptr<Wrapper>(new Wrapper(keychain));
+}
+
+SyncSocket::~SyncSocket()
+{}
+
+bool
+SyncSocket::publishData(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness)
+{
+ uint32_t sequence = getNextSeq(prefix, session);
+ ostringstream contentNameWithSeqno;
+ contentNameWithSeqno << prefix << "/" << session << "/" << sequence;
+
+ Name dataName(contentNameWithSeqno.str ());
+ Blob blob(buf, len);
+ Name signingIdentity = m_syncPolicyManager->inferSigningIdentity(dataName);
+
+ m_handler->publishDataByIdentity (dataName,
+ blob,
+ signingIdentity,
+ freshness);
+
+ SeqNo s(session, sequence + 1);
+ m_sequenceLog[prefix] = s;
+ m_syncLogic.addLocalNames (prefix, session, sequence);
+ return true;
+}
+
+void
+SyncSocket::fetchData(const string &prefix, const SeqNo &seq, const DataCallback& callback, int retry)
+{
+ ostringstream interestName;
+ interestName << prefix << "/" << seq.getSession() << "/" << seq.getSeq();
+ //std::cout << "Socket " << this << " Send Interest <" << interestName.str() << "> for raw data " << endl;
+
+
+ Ptr<ndn::Interest> interestPtr = Ptr<ndn::Interest>(new ndn::Interest(interestName.str()));
+ Ptr<Closure> closure = Ptr<Closure> (new Closure(callback,
+ boost::bind(&SyncSocket::onChatDataTimeout,
+ this,
+ _1,
+ _2,
+ retry),
+ boost::bind(&SyncSocket::onChatDataUnverified,
+ this,
+ _1)));
+ m_handler->sendInterest(interestPtr, closure);
+}
+
+void
+SyncSocket::onChatDataTimeout(Ptr<Closure> closure, Ptr<ndn::Interest> interest, int retry)
+{
+ if(retry > 0)
+ {
+ Ptr<Closure> newClosure = Ptr<Closure>(new Closure(closure->m_dataCallback,
+ boost::bind(&SyncSocket::onChatDataTimeout,
+ this,
+ _1,
+ _2,
+ retry - 1),
+ closure->m_unverifiedCallback,
+ closure->m_stepCount)
+ );
+ m_handler->sendInterest(interest, newClosure);
+ }
+}
+
+void
+SyncSocket::onChatDataUnverified(Ptr<Data> data)
+{}
+
+
+uint32_t
+SyncSocket::getNextSeq (const string &prefix, uint32_t session)
+{
+ SequenceLog::iterator i = m_sequenceLog.find (prefix);
+
+ if (i != m_sequenceLog.end ())
+ {
+ SeqNo s = i->second;
+ if (s.getSession() == session)
+ return s.getSeq();
+ }
+ return 0;
+}
+
+}//Sync
diff --git a/src/ccnx/sync-socket.h b/src/ccnx/sync-socket.h
new file mode 100644
index 0000000..a4530ba
--- /dev/null
+++ b/src/ccnx/sync-socket.h
@@ -0,0 +1,111 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#ifndef SYNC_SOCKET_H
+#define SYNC_SOCKET_H
+
+#include "sync-logic.h"
+#include <boost/function.hpp>
+#include <boost/unordered_map.hpp>
+#include "sync-seq-no.h"
+#include <ndn.cxx/wrapper/wrapper.h>
+#include <utility>
+#include <map>
+#include <vector>
+#include <sstream>
+
+namespace Sync {
+
+/**
+ * \ingroup sync
+ * @brief A simple interface to interact with client code
+ */
+class SyncSocket
+{
+public:
+ typedef boost::function< void (const std::vector<MissingDataInfo> &, SyncSocket * ) > NewDataCallback;
+ typedef boost::function< void ( const std::string &/*prefix*/ ) > RemoveCallback;
+ /**
+ * @brief the constructor for SyncAppSocket; the parameter syncPrefix
+ * should be passed to the constructor of m_syncAppWrapper; the other
+ * parameter should be passed to the constructor of m_fetcher; furthermore,
+ * the fetch function of m_fetcher should be a second paramter passed to
+ * the constructor of m_syncAppWrapper, so that m_syncAppWrapper can tell
+ * m_fetcher to fetch the actual app data after it learns the names
+ *
+ * @param syncPrefix the name prefix for Sync Interest
+ * @param dataCallback the callback to process data
+ */
+ SyncSocket (const std::string &syncPrefix,
+ ndn::Ptr<SyncPolicyManager> syncPolicyManager,
+ NewDataCallback dataCallback,
+ RemoveCallback rmCallback);
+
+ ~SyncSocket ();
+
+ bool
+ publishData(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness);
+
+ void
+ remove (const std::string &prefix)
+ { m_syncLogic.remove(prefix); }
+
+ void
+ fetchData(const std::string &prefix, const SeqNo &seq, const ndn::DataCallback& callback, int retry = 0);
+
+ std::string
+ getRootDigest()
+ { return m_syncLogic.getRootDigest(); }
+
+ uint32_t
+ getNextSeq (const std::string &prefix, uint32_t session);
+
+ SyncLogic &
+ getLogic ()
+ { return m_syncLogic; }
+
+ // make this a static function so we don't have to create socket instance without
+ // knowing the local prefix. it's a wrong place for this function anyway
+ static std::string
+ GetLocalPrefix ();
+
+private:
+ void
+ passCallback(const std::vector<MissingDataInfo> &v)
+ { m_newDataCallback(v, this); }
+
+ void
+ onChatDataTimeout(ndn::Ptr<ndn::Closure> closure, ndn::Ptr<ndn::Interest> interest, int retry);
+
+ void
+ onChatDataUnverified(ndn::Ptr<ndn::Data> data);
+
+private:
+ typedef boost::unordered_map<std::string, SeqNo> SequenceLog;
+ NewDataCallback m_newDataCallback;
+ SequenceLog m_sequenceLog;
+ ndn::Ptr<SyncPolicyManager> m_syncPolicyManager;
+ ndn::Ptr<ndn::Wrapper> m_handler;
+ SyncLogic m_syncLogic;
+};
+
+} // Sync
+
+#endif // SYNC_SOCKET_H
diff --git a/src/specific-policy-rule.cc b/src/specific-policy-rule.cc
new file mode 100644
index 0000000..9fbd23a
--- /dev/null
+++ b/src/specific-policy-rule.cc
@@ -0,0 +1,50 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "specific-policy-rule.h"
+#include <ndn.cxx/fields/signature-sha256-with-rsa.h>
+
+using namespace ndn;
+using namespace std;
+using namespace ndn::security;
+
+
+SpecificPolicyRule::SpecificPolicyRule(Ptr<Regex> dataRegex,
+ Ptr<Regex> signerRegex)
+ : PolicyRule(PolicyRule::IDENTITY_POLICY, true)
+ , m_dataRegex(dataRegex)
+ , m_signerRegex(signerRegex)
+{}
+
+SpecificPolicyRule::SpecificPolicyRule(const SpecificPolicyRule& rule)
+ : PolicyRule(PolicyRule::IDENTITY_POLICY, true)
+ , m_dataRegex(rule.m_dataRegex)
+ , m_signerRegex(rule.m_signerRegex)
+{}
+
+bool
+SpecificPolicyRule::matchDataName(const Data & data)
+{ return m_dataRegex->match(data.getName()); }
+
+bool
+SpecificPolicyRule::matchSignerName(const Data & data)
+{
+ Ptr<const signature::Sha256WithRsa> sigPtr = DynamicCast<const signature::Sha256WithRsa> (data.getSignature());
+ Name signerName = sigPtr->getKeyLocator ().getKeyName ();
+ return m_signerRegex->match(signerName);
+}
+
+bool
+SpecificPolicyRule::satisfy(const Data & data)
+{ return (matchDataName(data) && matchSignerName(data)) ? true : false ; }
+
+bool
+SpecificPolicyRule::satisfy(const Name & dataName, const Name & signerName)
+{ return (m_dataRegex->match(dataName) && m_signerRegex->match(signerName)); }
diff --git a/src/specific-policy-rule.h b/src/specific-policy-rule.h
new file mode 100644
index 0000000..e2b859b
--- /dev/null
+++ b/src/specific-policy-rule.h
@@ -0,0 +1,46 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#ifndef SPECIFIC_POLICY_RULE_H
+#define SPECIFIC_POLICY_RULE_H
+
+#include <ndn.cxx/security/policy/policy-rule.h>
+#include <ndn.cxx/regex/regex.h>
+
+class SpecificPolicyRule : public ndn::security::PolicyRule
+{
+
+public:
+ SpecificPolicyRule(ndn::Ptr<ndn::Regex> dataRegex,
+ ndn::Ptr<ndn::Regex> signerRegex);
+
+ SpecificPolicyRule(const SpecificPolicyRule& rule);
+
+ virtual
+ ~SpecificPolicyRule() {};
+
+ bool
+ matchDataName(const ndn::Data & data);
+
+ bool
+ matchSignerName(const ndn::Data & data);
+
+ bool
+ satisfy(const ndn::Data & data);
+
+ bool
+ satisfy(const ndn::Name & dataName, const ndn::Name & signerName);
+
+private:
+ ndn::Ptr<ndn::Regex> m_dataRegex;
+ ndn::Ptr<ndn::Regex> m_signerRegex;
+};
+
+#endif //CHAT_POLICY_RULE_H
diff --git a/src/sync-intro-certificate.cc b/src/sync-intro-certificate.cc
new file mode 100644
index 0000000..9d77cca
--- /dev/null
+++ b/src/sync-intro-certificate.cc
@@ -0,0 +1,213 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "sync-intro-certificate.h"
+#include <ndn.cxx/security/exception.h>
+
+using namespace ndn;
+using namespace ndn::security;
+using namespace std;
+
+SyncIntroCertificate::SyncIntroCertificate ()
+ : Certificate()
+{}
+
+SyncIntroCertificate::SyncIntroCertificate (const Name& nameSpace,
+ const Name& keyName,
+ const Name& signerName,
+ const Time& notBefore,
+ const Time& notAfter,
+ const Publickey& key,
+ const IntroType& introType)
+ : m_keyName(keyName)
+ , m_introType(introType)
+{
+ Name certificateName = nameSpace;
+ certificateName.append("WOT").append(keyName).append("INTRO-CERT").append(signerName);
+ switch(introType)
+ {
+ case PRODUCER:
+ certificateName.append("PRODUCER");
+ break;
+ case INTRODUCER:
+ certificateName.append("INTRODUCER");
+ break;
+ default:
+ throw SecException("Wrong Introduction Type!");
+ }
+ certificateName.appendVersion();
+
+ setName(certificateName);
+ setNotBefore(notBefore);
+ setNotAfter(notAfter);
+ setPublicKeyInfo(key);
+ addSubjectDescription(CertificateSubDescrypt("2.5.4.41", keyName.toUri()));
+}
+
+SyncIntroCertificate::SyncIntroCertificate (const Name& nameSpace,
+ const IdentityCertificate& identityCertificate,
+ const Name& signerName,
+ const IntroType& introType)
+ : m_introType(introType)
+{
+ m_keyName = identityCertificate.getPublicKeyName();
+
+ Name certificateName = nameSpace;
+ certificateName.append("WOT").append(m_keyName).append("INTRO-CERT").append(signerName);
+ switch(introType)
+ {
+ case PRODUCER:
+ certificateName.append("PRODUCER");
+ break;
+ case INTRODUCER:
+ certificateName.append("INTRODUCER");
+ break;
+ default:
+ throw SecException("Wrong Introduction Type!");
+ }
+ certificateName.appendVersion();
+
+ setName(certificateName);
+ setNotBefore(identityCertificate.getNotBefore());
+ setNotAfter(identityCertificate.getNotAfter());
+ setPublicKeyInfo(identityCertificate.getPublicKeyInfo());
+ addSubjectDescription(CertificateSubDescrypt("2.5.4.41", m_keyName.toUri()));
+}
+
+SyncIntroCertificate::SyncIntroCertificate (const Data& data)
+ : Certificate(data)
+{
+ Name certificateName = getName();
+ int i = 0;
+ int keyNameStart = 0;
+ int keyNameEnd = 0;
+ for(; i < certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("WOT"))
+ {
+ keyNameStart = i + 1;
+ break;
+ }
+ }
+
+ if(i >= certificateName.size())
+ throw SecException("Wrong SyncIntroCertificate Name!");
+
+ for(; i< certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("INTRO-CERT"))
+ {
+ keyNameEnd = i;
+ break;
+ }
+ }
+
+ if(i >= certificateName.size())
+ throw SecException("Wrong SyncIntroCertificate Name!");
+
+ m_keyName = certificateName.getSubName(keyNameStart, keyNameEnd);
+
+ string typeComponent = certificateName.get(certificateName.size() - 2).toUri();
+ if(typeComponent == string("PRODUCER"))
+ m_introType = PRODUCER;
+ else if(typeComponent == string("INTRODUCER"))
+ m_introType = INTRODUCER;
+ else
+ throw SecException("Wrong SyncIntroCertificate Name!");
+}
+
+SyncIntroCertificate::SyncIntroCertificate (const SyncIntroCertificate& chronosIntroCertificate)
+ : Certificate(chronosIntroCertificate)
+ , m_keyName(chronosIntroCertificate.m_keyName)
+ , m_introType(chronosIntroCertificate.m_introType)
+{}
+
+Data &
+SyncIntroCertificate::setName (const Name& certificateName)
+{
+ int i = 0;
+ int keyNameStart = 0;
+ int keyNameEnd = 0;
+ for(; i < certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("WOT"))
+ {
+ keyNameStart = i + 1;
+ break;
+ }
+ }
+
+ if(i >= certificateName.size())
+ throw SecException("Wrong SyncIntroCertificate Name!");
+
+ for(; i< certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("INTRO-CERT"))
+ {
+ keyNameEnd = i;
+ break;
+ }
+ }
+
+ if(i >= certificateName.size())
+ throw SecException("Wrong SyncIntroCertificate Name!");
+
+ m_keyName = certificateName.getSubName(keyNameStart, keyNameEnd);
+
+ string typeComponent = certificateName.get(certificateName.size() - 2).toUri();
+ if(typeComponent == string("PRODUCER"))
+ m_introType = PRODUCER;
+ else if(typeComponent == string("INTRODUCER"))
+ m_introType = INTRODUCER;
+ else
+ throw SecException("Wrong SyncIntroCertificate Name!");
+
+ return *this;
+}
+
+bool
+SyncIntroCertificate::isSyncIntroCertificate(const Certificate& certificate)
+{
+ const Name& certificateName = certificate.getName();
+ string introType = certificateName.get(certificateName.size() - 2).toUri();
+ if(introType != string("PRODUCER") && introType != string("INTRODUCER"))
+ return false;
+
+ int i = 0;
+ bool findWot = false;
+ bool findIntroCert = false;
+ for(; i < certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("WOT"))
+ {
+ findWot = true;
+ break;
+ }
+ }
+
+ if(!findWot)
+ return false;
+
+ for(; i < certificateName.size(); i++)
+ {
+ if(certificateName.get(i).toUri() == string("INTRO-CERT"))
+ {
+ findIntroCert = true;
+ break;
+ }
+ }
+ if(!findIntroCert)
+ return false;
+
+ if(i < certificateName.size() - 2)
+ return true;
+
+ return false;
+}
diff --git a/src/sync-intro-certificate.h b/src/sync-intro-certificate.h
new file mode 100644
index 0000000..db41627
--- /dev/null
+++ b/src/sync-intro-certificate.h
@@ -0,0 +1,69 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#ifndef SYNC_INTRO_CERTIFICATE_H
+#define SYNC_INTRO_CERTIFICATE_H
+
+#include <ndn.cxx/security/certificate/certificate.h>
+#include <ndn.cxx/security/certificate/identity-certificate.h>
+
+class SyncIntroCertificate : public ndn::security::Certificate
+{
+public:
+ enum IntroType{
+ PRODUCER,
+ INTRODUCER
+ };
+
+public:
+ SyncIntroCertificate ();
+
+ SyncIntroCertificate (const ndn::Name& nameSpace,
+ const ndn::Name& keyName,
+ const ndn::Name& signerName,
+ const ndn::Time& notBefore,
+ const ndn::Time& notAfter,
+ const ndn::security::Publickey& key,
+ const IntroType& introType = PRODUCER);
+
+ SyncIntroCertificate (const ndn::Name& nameSpace,
+ const ndn::security::IdentityCertificate& identityCertificate,
+ const ndn::Name& signerName,
+ const IntroType& introType);
+
+ SyncIntroCertificate (const ndn::Data& data);
+
+ SyncIntroCertificate (const SyncIntroCertificate& chronosIntroCertificate);
+
+
+ virtual
+ ~SyncIntroCertificate ()
+ {}
+
+ ndn::Data &
+ setName (const ndn::Name& name);
+
+ inline virtual ndn::Name
+ getPublicKeyName () const
+ { return m_keyName; }
+
+ inline IntroType
+ getIntroType()
+ { return m_introType; }
+
+ static bool
+ isSyncIntroCertificate(const ndn::security::Certificate& certificate);
+
+protected:
+ ndn::Name m_keyName;
+ IntroType m_introType;
+};
+
+#endif
diff --git a/src/sync-logic.cc b/src/sync-logic.cc
index 98160b5..ef96a86 100644
--- a/src/sync-logic.cc
+++ b/src/sync-logic.cc
@@ -38,6 +38,7 @@
using namespace std;
using namespace boost;
+using namespace ndn;
INIT_LOGGER ("SyncLogic");
@@ -57,31 +58,36 @@
namespace Sync
{
-SyncLogic::SyncLogic (const std::string &syncPrefix,
- LogicUpdateCallback onUpdate,
- LogicRemoveCallback onRemove)
- : m_state (new FullState)
- , m_syncInterestTable (TIME_SECONDS (m_syncInterestReexpress))
- , m_syncPrefix (syncPrefix)
- , m_onUpdate (onUpdate)
- , m_onRemove (onRemove)
- , m_perBranch (false)
- , m_ccnxHandle(new CcnxWrapper ())
+ SyncLogic::SyncLogic (const Name& syncPrefix,
+ Ptr<SyncPolicyManager> syncPolicyManager,
+ LogicUpdateCallback onUpdate,
+ LogicRemoveCallback onRemove)
+ : m_state (new FullState)
+ , m_syncInterestTable (TIME_SECONDS (m_syncInterestReexpress))
+ , m_syncPrefix (syncPrefix)
+ , m_onUpdate (onUpdate)
+ , m_onRemove (onRemove)
+ , m_perBranch (false)
+ , m_policyManager(syncPolicyManager)
#ifndef NS3_MODULE
- , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
- , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (200,1000))
- , m_reexpressionJitter (m_randomGenerator, uniform_int<> (100,500))
+ , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
+ , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (200,1000))
+ , m_reexpressionJitter (m_randomGenerator, uniform_int<> (100,500))
#else
- , m_rangeUniformRandom (200,1000)
- , m_reexpressionJitter (10,500)
+ , m_rangeUniformRandom (200,1000)
+ , m_reexpressionJitter (10,500)
#endif
- , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
+ , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
{
#ifndef NS3_MODULE
// In NS3 module these functions are moved to StartApplication method
-
- m_ccnxHandle->setInterestFilter (m_syncPrefix,
- bind (&SyncLogic::respondSyncInterest, this, _1));
+
+ Ptr<security::Keychain> keychain = Ptr<security::Keychain>(new security::Keychain(Ptr<security::IdentityManager>::Create(),
+ m_policyManager,
+ NULL));
+ m_handler = Ptr<Wrapper>(new Wrapper(keychain));
+ m_handler->setInterestFilter (m_syncPrefix,
+ bind (&SyncLogic::respondSyncInterest, this, _1));
m_scheduler.schedule (TIME_SECONDS (0), // no need to add jitter
bind (&SyncLogic::sendSyncInterest, this),
@@ -89,14 +95,15 @@
#endif
}
-SyncLogic::SyncLogic (const std::string &syncPrefix,
+SyncLogic::SyncLogic (const Name& syncPrefix,
+ Ptr<SyncPolicyManager> syncPolicyManager,
LogicPerBranchCallback onUpdateBranch)
: m_state (new FullState)
, m_syncInterestTable (TIME_SECONDS (m_syncInterestReexpress))
, m_syncPrefix (syncPrefix)
, m_onUpdateBranch (onUpdateBranch)
, m_perBranch(true)
- , m_ccnxHandle(new CcnxWrapper())
+ , m_policyManager(syncPolicyManager)
#ifndef NS3_MODULE
, m_randomGenerator (static_cast<unsigned int> (std::time (0)))
, m_rangeUniformRandom (m_randomGenerator, uniform_int<> (200,1000))
@@ -109,8 +116,13 @@
{
#ifndef NS3_MODULE
// In NS3 module these functions are moved to StartApplication method
+
+ Ptr<security::Keychain> keychain = Ptr<security::Keychain>(new security::Keychain(Ptr<security::IdentityManager>::Create(),
+ m_policyManager,
+ NULL));
+ m_handler = Ptr<Wrapper>(new Wrapper(keychain));
- m_ccnxHandle->setInterestFilter (m_syncPrefix,
+ m_handler->setInterestFilter (m_syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
m_scheduler.schedule (TIME_SECONDS (0), // no need to add jitter
@@ -121,7 +133,7 @@
SyncLogic::~SyncLogic ()
{
- m_ccnxHandle->clearInterestFilter (m_syncPrefix);
+ m_handler->clearInterestFilter (Name(m_syncPrefix));
}
#ifdef NS3_MODULE
@@ -152,7 +164,7 @@
void
SyncLogic::stop()
{
- m_ccnxHandle->clearInterestFilter (m_syncPrefix);
+ m_handler->clearInterestFilter (Name(m_syncPrefix));
m_scheduler.cancel (REEXPRESSING_INTEREST);
m_scheduler.cancel (DELAYED_INTEREST_PROCESSING);
}
@@ -166,9 +178,9 @@
boost::tuple<DigestConstPtr, std::string>
SyncLogic::convertNameToDigestAndType (const std::string &name)
{
- BOOST_ASSERT (name.find (m_syncPrefix) == 0);
+ BOOST_ASSERT (name.find (m_syncPrefix.toUri()) == 0);
- string hash = name.substr (m_syncPrefix.size (), name.size ()-m_syncPrefix.size ());
+ string hash = name.substr (m_syncPrefix.toUri().size (), name.size ()-m_syncPrefix.toUri().size ());
if (hash[0] == '/')
hash = hash.substr (1, hash.size ()-1);
string interestType = "normal";
@@ -190,8 +202,10 @@
}
void
-SyncLogic::respondSyncInterest (const string &name)
+SyncLogic::respondSyncInterest (ndn::Ptr<ndn::Interest> interest)
{
+ _LOG_DEBUG("respondSyncInterest: " << interest->getName().toUri());
+ string name = interest->getName().toUri();
try
{
_LOG_TRACE ("<< I " << name);
@@ -218,8 +232,12 @@
}
void
-SyncLogic::respondSyncData (const std::string &name, const char *wireData, size_t len)
+SyncLogic::respondSyncData (Ptr<Data> data)
{
+ string name = data->getName().toUri();
+ const char *wireData = data->content().buf();
+ size_t len = data->content().size();
+
try
{
_LOG_TRACE ("<< D " << name);
@@ -247,10 +265,32 @@
}
}
+void
+SyncLogic::onSyncDataTimeout(Ptr<Closure> closure, Ptr<ndn::Interest> interest, int retry)
+{
+ if(retry > 0)
+ {
+ Ptr<Closure> newClosure = Ptr<Closure>(new Closure(closure->m_dataCallback,
+ boost::bind(&SyncLogic::onSyncDataTimeout,
+ this,
+ _1,
+ _2,
+ retry - 1),
+ closure->m_unverifiedCallback,
+ closure->m_stepCount)
+ );
+ m_handler->sendInterest(interest, newClosure);
+ }
+}
+
+void
+SyncLogic::onSyncDataUnverified()
+{}
void
SyncLogic::processSyncInterest (const std::string &name, DigestConstPtr digest, bool timedProcessing/*=false*/)
{
+ _LOG_DEBUG("processSyncInterest");
DigestConstPtr rootDigest;
{
recursive_mutex::scoped_lock lock (m_stateMutex);
@@ -435,7 +475,7 @@
void
SyncLogic::processSyncRecoveryInterest (const std::string &name, DigestConstPtr digest)
{
-
+ _LOG_DEBUG("processSyncRecoveryInterest");
DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
if (stateInDiffLog == m_log.end ())
@@ -564,6 +604,7 @@
void
SyncLogic::sendSyncInterest ()
{
+ _LOG_DEBUG("sendSyncInterest");
ostringstream os;
{
@@ -574,13 +615,25 @@
_LOG_TRACE (">> I " << os.str ());
}
+ _LOG_DEBUG("sendSyncInterest: " << os.str());
+
m_scheduler.cancel (REEXPRESSING_INTEREST);
m_scheduler.schedule (TIME_SECONDS_WITH_JITTER (m_syncInterestReexpress),
bind (&SyncLogic::sendSyncInterest, this),
REEXPRESSING_INTEREST);
-
- m_ccnxHandle->sendInterest (os.str (),
- bind (&SyncLogic::respondSyncData, this, _1, _2, _3));
+
+ Ptr<ndn::Interest> interestPtr = Ptr<ndn::Interest>(new ndn::Interest(os.str()));
+ Ptr<Closure> closure = Ptr<Closure> (new Closure(boost::bind(&SyncLogic::respondSyncData,
+ this,
+ _1),
+ boost::bind(&SyncLogic::onSyncDataTimeout,
+ this,
+ _1,
+ _2,
+ 0),
+ boost::bind(&SyncLogic::onSyncDataUnverified,
+ this)));
+ m_handler->sendInterest(interestPtr, closure);
}
void
@@ -601,8 +654,18 @@
REEXPRESSING_RECOVERY_INTEREST);
}
- m_ccnxHandle->sendInterest (os.str (),
- bind (&SyncLogic::respondSyncData, this, _1, _2, _3));
+ Ptr<ndn::Interest> interestPtr = Ptr<ndn::Interest>(new ndn::Interest(os.str()));
+ Ptr<Closure> closure = Ptr<Closure> (new Closure(boost::bind(&SyncLogic::respondSyncData,
+ this,
+ _1),
+ boost::bind(&SyncLogic::onSyncDataTimeout,
+ this,
+ _1,
+ _2,
+ 0),
+ boost::bind(&SyncLogic::onSyncDataUnverified,
+ this)));
+ m_handler->sendInterest (interestPtr, closure);
}
@@ -623,10 +686,13 @@
int size = ssm.ByteSize();
char *wireData = new char[size];
ssm.SerializeToArray(wireData, size);
- m_ccnxHandle->publishRawData (name,
- wireData,
- size,
- m_syncResponseFreshness); // in NS-3 it doesn't have any effect... yet
+ Blob blob(wireData, size);
+ Name dataName(name);
+ Name signingIdentity = m_policyManager->inferSigningIdentity(dataName);
+ m_handler->publishDataByIdentity (dataName,
+ blob,
+ signingIdentity,
+ m_syncResponseFreshness); // in NS-3 it doesn't have any effect... yet
delete []wireData;
// checking if our own interest got satisfied
diff --git a/src/sync-logic.h b/src/sync-logic.h
index 1bc5984..b7152cc 100644
--- a/src/sync-logic.h
+++ b/src/sync-logic.h
@@ -29,12 +29,13 @@
#include <memory>
#include <map>
-#include "ccnx/sync-ccnx-wrapper.h"
+#include <ndn.cxx/wrapper/wrapper.h>
#include "sync-interest-table.h"
#include "sync-diff-state.h"
#include "sync-full-state.h"
#include "sync-std-name-info.h"
#include "sync-scheduler.h"
+#include "sync-policy-manager.h"
#include "sync-diff-state-container.h"
@@ -82,11 +83,13 @@
* @param ccnxHandle ccnx handle
* the app data when new remote names are learned
*/
- SyncLogic (const std::string &syncPrefix,
+ SyncLogic (const ndn::Name& syncPrefix,
+ ndn::Ptr<SyncPolicyManager> syncPolicyManager,
LogicUpdateCallback onUpdate,
LogicRemoveCallback onRemove);
- SyncLogic (const std::string &syncPrefix,
+ SyncLogic (const ndn::Name& syncPrefix,
+ ndn::Ptr<SyncPolicyManager> syncPolicyManager,
LogicPerBranchCallback onUpdateBranch);
~SyncLogic ();
@@ -100,14 +103,14 @@
* @brief respond to the Sync Interest; a lot of logic needs to go in here
* @param interest the Sync Interest in string format
*/
- void respondSyncInterest (const std::string &interest);
+ void respondSyncInterest (ndn::Ptr<ndn::Interest>);
/**
* @brief process the fetched sync data
* @param name the data name
* @param dataBuffer the sync data
*/
- void respondSyncData (const std::string &name, const char *wireData, size_t len);
+ void respondSyncData (ndn::Ptr<ndn::Data> data);
/**
* @brief remove a participant's subtree from the sync tree
@@ -142,6 +145,12 @@
delayedChecksLoop ();
void
+ onSyncDataTimeout(ndn::Ptr<ndn::Closure> closure, ndn::Ptr<ndn::Interest> interest, int retry);
+
+ void
+ onSyncDataUnverified();
+
+ void
processSyncInterest (const std::string &name,
DigestConstPtr digest, bool timedProcessing=false);
@@ -187,12 +196,13 @@
std::string m_outstandingInterestName;
SyncInterestTable m_syncInterestTable;
- std::string m_syncPrefix;
+ ndn::Name m_syncPrefix;
LogicUpdateCallback m_onUpdate;
LogicRemoveCallback m_onRemove;
LogicPerBranchCallback m_onUpdateBranch;
bool m_perBranch;
- CcnxWrapperPtr m_ccnxHandle;
+ ndn::Ptr<SyncPolicyManager> m_policyManager;
+ ndn::Ptr<ndn::Wrapper> m_handler;
Scheduler m_scheduler;
diff --git a/src/sync-policy-manager.cc b/src/sync-policy-manager.cc
new file mode 100644
index 0000000..86726d3
--- /dev/null
+++ b/src/sync-policy-manager.cc
@@ -0,0 +1,341 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "sync-policy-manager.h"
+
+#include "sync-intro-certificate.h"
+#include "sync-logging.h"
+
+using namespace ndn;
+using namespace ndn::security;
+using namespace std;
+
+INIT_LOGGER("SyncPolicyManager");
+
+SyncPolicyManager::SyncPolicyManager(const Name& signingIdentity,
+ const Name& signingCertificateName,
+ const Name& syncPrefix,
+ int stepLimit)
+ : m_signingIdentity(signingIdentity)
+ , m_signingCertificateName(signingCertificateName.getPrefix(signingCertificateName.size()-1))
+ , m_syncPrefix(syncPrefix)
+ , m_stepLimit(stepLimit)
+{
+ Name wotPrefix = syncPrefix;
+ wotPrefix.append("WOT");
+ m_syncPrefixRegex = Regex::fromName(syncPrefix);
+ m_wotPrefixRegex = Regex::fromName(wotPrefix);
+ m_chatDataPolicy = Ptr<IdentityPolicyRule>(new IdentityPolicyRule("^[^<FH>]*<FH>([^<chronos>]*)<chronos><>",
+ "^(<>*)<KEY><DSK-.*><ID-CERT><>$",
+ "==", "\\1", "\\1", true));
+}
+
+SyncPolicyManager::~SyncPolicyManager()
+{}
+
+bool
+SyncPolicyManager::skipVerifyAndTrust (const Data& data)
+{ return false; }
+
+bool
+SyncPolicyManager::requireVerify (const Data& data)
+{ return true; }
+
+Ptr<ValidationRequest>
+SyncPolicyManager::checkVerificationPolicy(Ptr<Data> data,
+ const int& stepCount,
+ const DataCallback& verifiedCallback,
+ const UnverifiedCallback& unverifiedCallback)
+{
+#ifdef _DEBUG
+ _LOG_DEBUG("checkVerificationPolicy");
+ verifiedCallback(data);
+ return NULL;
+#else
+ //TODO:
+ if(stepCount > m_stepLimit)
+ {
+ unverifiedCallback(data);
+ return NULL;
+ }
+
+ Ptr<const signature::Sha256WithRsa> sha256sig = DynamicCast<const signature::Sha256WithRsa> (data->getSignature());
+ if(KeyLocator::KEYNAME != sha256sig->getKeyLocator().getType())
+ {
+ unverifiedCallback(data);
+ return NULL;
+ }
+
+ const Name& keyLocatorName = sha256sig->getKeyLocator().getKeyName();
+
+ // if data is intro cert
+ if(m_wotPrefixRegex->match(data->getName()))
+ {
+ Name keyName = IdentityCertificate::certificateNameToPublicKeyName(keyLocatorName);
+ map<Name, Publickey>::const_iterator it = m_trustedIntroducers.find(keyName);
+ if(m_trustedIntroducers.end() != it)
+ {
+ if(verifySignature(*data, it->second))
+ verifiedCallback(data);
+ else
+ unverifiedCallback(data);
+ return NULL;
+ }
+ else
+ return prepareRequest(keyName, true, data, stepCount, verifiedCallback, unverifiedCallback);
+ }
+
+ // if data is sync data or chat data
+ if(m_syncPrefixRegex->match(data->getName()) || m_chatDataPolicy->satisfy(*data))
+ {
+ Name keyName = IdentityCertificate::certificateNameToPublicKeyName(keyLocatorName);
+
+ map<Name, Publickey>::const_iterator it = m_trustedIntroducers.find(keyName);
+ if(m_trustedIntroducers.end() != it)
+ {
+ if(verifySignature(*data, it->second))
+ verifiedCallback(data);
+ else
+ unverifiedCallback(data);
+ return NULL;
+ }
+
+ it = m_trustedProducers.find(keyName);
+ if(m_trustedProducers.end() != it)
+ {
+ if(verifySignature(*data, it->second))
+ verifiedCallback(data);
+ else
+ unverifiedCallback(data);
+ return NULL;
+ }
+
+ return prepareRequest(keyName, false, data, stepCount, verifiedCallback, unverifiedCallback);
+ }
+
+ unverifiedCallback(data);
+ return NULL;
+#endif
+}
+
+bool
+SyncPolicyManager::checkSigningPolicy(const Name& dataName,
+ const Name& certificateName)
+{
+
+#ifdef _DEBUG
+ _LOG_DEBUG("checkSigningPolicy");
+ return true;
+#else
+ return (m_syncPrefixRegex->match(dataName) && certificateName == m_signingCertificateName) ? true : false;
+#endif
+}
+
+Name
+SyncPolicyManager::inferSigningIdentity(const ndn::Name& dataName)
+{ return m_signingIdentity; }
+
+void
+SyncPolicyManager::addTrustAnchor(const IdentityCertificate& identityCertificate, bool isIntroducer)
+{
+ if(isIntroducer)
+ m_trustedIntroducers.insert(pair <Name, Publickey > (identityCertificate.getPublicKeyName(), identityCertificate.getPublicKeyInfo()));
+ else
+ m_trustedProducers.insert(pair <Name, Publickey > (identityCertificate.getPublicKeyName(), identityCertificate.getPublicKeyInfo()));
+}
+
+void
+SyncPolicyManager::addChatDataRule(const Name& prefix,
+ const IdentityCertificate& identityCertificate,
+ bool isIntroducer)
+{
+ // Name dataPrefix = prefix;
+ // dataPrefix.append("chronos").append(m_syncPrefix.get(-1));
+ // Ptr<Regex> dataRegex = Regex::fromName(prefix);
+ // Name certName = identityCertificate.getName();
+ // Name signerName = certName.getPrefix(certName.size()-1);
+ // Ptr<Regex> signerRegex = Regex::fromName(signerName, true);
+
+ // SpecificPolicyRule rule(dataRegex, signerRegex);
+ // map<Name, SpecificPolicyRule>::iterator it = m_chatDataRules.find(dataPrefix);
+ // if(it != m_chatDataRules.end())
+ // it->second = rule;
+ // else
+ // m_chatDataRules.insert(pair <Name, SpecificPolicyRule > (dataPrefix, rule));
+
+ addTrustAnchor(identityCertificate, isIntroducer);
+}
+
+
+Ptr<const vector<Name> >
+SyncPolicyManager::getAllIntroducerName()
+{
+ Ptr<vector<Name> > nameList = Ptr<vector<Name> >(new vector<Name>);
+
+ map<Name, Publickey>::iterator it = m_trustedIntroducers.begin();
+ for(; it != m_trustedIntroducers.end(); it++)
+ nameList->push_back(it->first);
+
+ return nameList;
+}
+
+Ptr<ValidationRequest>
+SyncPolicyManager::prepareRequest(const Name& keyName,
+ bool forIntroducer,
+ Ptr<Data> data,
+ const int & stepCount,
+ const DataCallback& verifiedCallback,
+ const UnverifiedCallback& unverifiedCallback)
+{
+ Ptr<Name> interestPrefixName = Ptr<Name>(new Name(m_syncPrefix));
+ interestPrefixName->append("WOT").append(keyName).append("INTRO-CERT");
+
+ Ptr<const std::vector<ndn::Name> > nameList = getAllIntroducerName();
+
+ Name interestName = *interestPrefixName;
+ interestName.append(nameList->at(0));
+
+ if(forIntroducer)
+ interestName.append("INTRODUCER");
+
+ Ptr<Interest> interest = Ptr<Interest>(new Interest(interestName));
+ interest->setChildSelector(Interest::CHILD_RIGHT);
+
+ DataCallback requestedCertVerifiedCallback = boost::bind(&SyncPolicyManager::onIntroCertVerified,
+ this,
+ _1,
+ forIntroducer,
+ data,
+ verifiedCallback,
+ unverifiedCallback);
+
+ UnverifiedCallback requestedCertUnverifiedCallback = boost::bind(&SyncPolicyManager::onIntroCertUnverified,
+ this,
+ _1,
+ interestPrefixName,
+ forIntroducer,
+ nameList,
+ 1,
+ data,
+ verifiedCallback,
+ unverifiedCallback);
+
+
+ Ptr<ValidationRequest> nextStep = Ptr<ValidationRequest>(new ValidationRequest(interest,
+ requestedCertVerifiedCallback,
+ requestedCertUnverifiedCallback,
+ 1,
+ m_stepLimit-1)
+ );
+ return nextStep;
+}
+
+void
+SyncPolicyManager::onIntroCertVerified(Ptr<Data> introCertificateData,
+ bool forIntroducer,
+ Ptr<Data> originalData,
+ const DataCallback& verifiedCallback,
+ const UnverifiedCallback& unverifiedCallback)
+{
+ Ptr<SyncIntroCertificate> introCertificate = Ptr<SyncIntroCertificate>(new SyncIntroCertificate(*introCertificateData));
+ if(forIntroducer)
+ m_trustedIntroducers.insert(pair <Name, Publickey > (introCertificate->getPublicKeyName(), introCertificate->getPublicKeyInfo()));
+ else
+ m_trustedProducers.insert(pair <Name, Publickey > (introCertificate->getPublicKeyName(), introCertificate->getPublicKeyInfo()));
+
+ if(verifySignature(*originalData, introCertificate->getPublicKeyInfo()))
+ verifiedCallback(originalData);
+ else
+ unverifiedCallback(originalData);
+}
+
+void
+SyncPolicyManager::onIntroCertUnverified(Ptr<Data> introCertificateData,
+ Ptr<Name> interestPrefixName,
+ bool forIntroducer,
+ Ptr<const std::vector<ndn::Name> > introNameList,
+ const int& nextIntroducerIndex,
+ Ptr<Data> originalData,
+ const DataCallback& verifiedCallback,
+ const UnverifiedCallback& unverifiedCallback)
+{
+ Name interestName = *interestPrefixName;
+ if(nextIntroducerIndex < introNameList->size())
+ interestName.append(introNameList->at(nextIntroducerIndex));
+ else
+ unverifiedCallback(originalData);
+
+ if(forIntroducer)
+ interestName.append("INTRODUCER");
+
+ Ptr<Interest> interest = Ptr<Interest>(new Interest(interestName));
+ interest->setChildSelector(Interest::CHILD_RIGHT);
+
+ DataCallback requestedCertVerifiedCallback = boost::bind(&SyncPolicyManager::onIntroCertVerified,
+ this,
+ _1,
+ forIntroducer,
+ originalData,
+ verifiedCallback,
+ unverifiedCallback);
+
+ UnverifiedCallback requestedCertUnverifiedCallback = boost::bind(&SyncPolicyManager::onIntroCertUnverified,
+ this,
+ _1,
+ interestPrefixName,
+ forIntroducer,
+ introNameList,
+ nextIntroducerIndex + 1,
+ originalData,
+ verifiedCallback,
+ unverifiedCallback);
+
+ TimeoutCallback requestedCertTimeoutCallback = boost::bind(&SyncPolicyManager::onIntroCertTimeOut,
+ this,
+ _1,
+ _2,
+ 1,
+ requestedCertUnverifiedCallback,
+ originalData);
+
+ Ptr<Closure> closure = Ptr<Closure> (new Closure(requestedCertVerifiedCallback,
+ requestedCertTimeoutCallback,
+ requestedCertUnverifiedCallback,
+ m_stepLimit-1)
+ );
+
+ m_handler->sendInterest(interest, closure);
+}
+
+void
+SyncPolicyManager::onIntroCertTimeOut(Ptr<Closure> closure,
+ Ptr<Interest> interest,
+ int retry,
+ const UnverifiedCallback& unverifiedCallback,
+ Ptr<Data> data)
+{
+ if(retry > 0)
+ {
+ Ptr<Closure> newClosure = Ptr<Closure>(new Closure(closure->m_dataCallback,
+ boost::bind(&SyncPolicyManager::onIntroCertTimeOut,
+ this,
+ _1,
+ _2,
+ retry - 1,
+ unverifiedCallback,
+ data),
+ closure->m_unverifiedCallback,
+ closure->m_stepCount)
+ );
+ m_handler->sendInterest(interest, newClosure);
+ }
+ else
+ unverifiedCallback(data);
+}
diff --git a/src/sync-policy-manager.h b/src/sync-policy-manager.h
new file mode 100644
index 0000000..418abc7
--- /dev/null
+++ b/src/sync-policy-manager.h
@@ -0,0 +1,124 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013, Regents of the University of California
+ * Yingdi Yu
+ *
+ * BSD license, See the LICENSE file for more information
+ *
+ * Author: Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#ifndef SYNC_POLICY_MANAGER_H
+#define SYNC_POLICY_MANAGER_H
+
+#include <ndn.cxx/security/policy/policy-manager.h>
+#include <ndn.cxx/security/policy/identity-policy-rule.h>
+#include <ndn.cxx/security/certificate/identity-certificate.h>
+#include <ndn.cxx/regex/regex.h>
+#include <ndn.cxx/wrapper/wrapper.h>
+#include "specific-policy-rule.h"
+
+
+class SyncPolicyManager : public ndn::security::PolicyManager
+{
+public:
+ SyncPolicyManager(const ndn::Name& signingIdentity,
+ const ndn::Name& signingCertificateName,
+ const ndn::Name& syncPrefix,
+ int m_stepLimit = 3);
+
+ virtual
+ ~SyncPolicyManager();
+
+ bool
+ skipVerifyAndTrust (const ndn::Data& data);
+
+ bool
+ requireVerify (const ndn::Data& data);
+
+ ndn::Ptr<ndn::security::ValidationRequest>
+ checkVerificationPolicy(ndn::Ptr<ndn::Data> data,
+ const int& stepCount,
+ const ndn::DataCallback& verifiedCallback,
+ const ndn::UnverifiedCallback& unverifiedCallback);
+
+ bool
+ checkSigningPolicy(const ndn::Name& dataName,
+ const ndn::Name& certificateName);
+
+ ndn::Name
+ inferSigningIdentity(const ndn::Name& dataName);
+
+ void
+ addTrustAnchor(const ndn::security::IdentityCertificate& identityCertificate, bool isIntroducer);
+
+ void
+ addChatDataRule(const ndn::Name& prefix,
+ const ndn::security::IdentityCertificate& identityCertificate,
+ bool isIntroducer);
+
+ inline void
+ setWrapper(ndn::Wrapper* handler)
+ { m_handler = handler; }
+
+private:
+ ndn::Ptr<ndn::security::ValidationRequest>
+ prepareIntroducerRequest(const ndn::Name& keyName,
+ ndn::Ptr<ndn::Data> data,
+ const int & stepCount,
+ const ndn::DataCallback& verifiedCallback,
+ const ndn::UnverifiedCallback& unverifiedCallback);
+
+ ndn::Ptr<const std::vector<ndn::Name> >
+ getAllIntroducerName();
+
+ ndn::Ptr<ndn::security::ValidationRequest>
+ prepareRequest(const ndn::Name& keyName,
+ bool forIntroducer,
+ ndn::Ptr<ndn::Data> data,
+ const int & stepCount,
+ const ndn::DataCallback& verifiedCallback,
+ const ndn::UnverifiedCallback& unverifiedCallback);
+
+ void
+ onIntroCertVerified(ndn::Ptr<ndn::Data> introCertificateData,
+ bool forIntroducer,
+ ndn::Ptr<ndn::Data> originalData,
+ const ndn::DataCallback& verifiedCallback,
+ const ndn::UnverifiedCallback& unverifiedCallback);
+
+ void
+ onIntroCertUnverified(ndn::Ptr<ndn::Data> introCertificateData,
+ ndn::Ptr<ndn::Name> interestPrefixName,
+ bool forIntroducer,
+ ndn::Ptr<const std::vector<ndn::Name> > introNameList,
+ const int& nextIntroducerIndex,
+ ndn::Ptr<ndn::Data> originalData,
+ const ndn::DataCallback& verifiedCallback,
+ const ndn::UnverifiedCallback& unverifiedCallback);
+
+ void
+ onIntroCertTimeOut(ndn::Ptr<ndn::Closure> closure,
+ ndn::Ptr<ndn::Interest> interest,
+ int retry,
+ const ndn::UnverifiedCallback& unverifiedCallback,
+ ndn::Ptr<ndn::Data> data);
+
+
+
+private:
+ ndn::Name m_signingIdentity;
+ ndn::Name m_signingCertificateName;
+ ndn::Name m_syncPrefix;
+ int m_stepLimit;
+ ndn::Ptr<ndn::Regex> m_syncPrefixRegex;
+ ndn::Ptr<ndn::Regex> m_wotPrefixRegex;
+ ndn::Ptr<ndn::security::IdentityPolicyRule> m_chatDataPolicy;
+ std::map<ndn::Name, ndn::security::Publickey> m_trustedIntroducers;
+ std::map<ndn::Name, ndn::security::Publickey> m_trustedProducers;
+ std::map<ndn::Name, SpecificPolicyRule> m_chatDataRules;
+
+ ndn::Wrapper* m_handler;
+};
+
+#endif