get rid of publisher and fetcher
modify SyncAppSocket API
modify SyncLogic API
Tweak SeqNo
diff --git a/ccnx/sync-app-data-fetch.cc b/ccnx/sync-app-data-fetch.cc
deleted file mode 100644
index 5cfabce..0000000
--- a/ccnx/sync-app-data-fetch.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#include "sync-app-data-fetch.h"
-#include "sync-log.h"
-
-using namespace std;
-using namespace boost;
-
-namespace Sync
-{
-
-INIT_LOGGER ("AppDataFetch");
-
-void
-AppDataFetch::onUpdate (const std::string &prefix, const SeqNo &newSeq, const SeqNo &oldSeq)
-{
- _LOG_FUNCTION (this << ", " << prefix << ", " << newSeq << ", " << oldSeq);
-
- // sequence number logic here
- uint32_t start = 0;
- if (oldSeq.isValid () && oldSeq.getSession() == newSeq.getSession())
- {
- start = oldSeq.getSeq () + 1;
- }
- uint32_t end = newSeq.getSeq ();
-
- //
- // add logic for wrap around
- //
-
- for (uint32_t i = start; i <= end; i++)
- {
- ostringstream interestName;
- interestName << prefix << "/" << newSeq.getSession () << "/" << i;
- m_ccnxHandle->sendInterest (interestName.str (), m_dataCallback);
- }
-}
-
-void
-AppDataFetch::onRemove (const std::string &prefix)
-{
- _LOG_FUNCTION (this << ", " << prefix);
-
- // I guess this should be somewhere in app
-}
-
-}
diff --git a/ccnx/sync-app-data-fetch.h b/ccnx/sync-app-data-fetch.h
deleted file mode 100644
index 1e42860..0000000
--- a/ccnx/sync-app-data-fetch.h
+++ /dev/null
@@ -1,86 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#ifndef SYNC_APP_DATA_FETCH_H
-#define SYNC_APP_DATA_FETCH_H
-
-#include "sync-ccnx-wrapper.h"
-#include "sync-seq-no.h"
-
-namespace Sync {
-
-/**
- * \ingroup sync
- * @brief fetch application data, by default it will try to fetch every piece
- * of data
- */
-class AppDataFetch
-{
-public:
- /**
- * @brief Constructor
- * @param ccnxHandle handle for CCNx
- * @param dataCallback the callback function to process data
- */
- AppDataFetch (CcnxWrapperPtr ccnxHandle,
- CcnxWrapper::DataCallback dataCallback)
- : m_ccnxHandle (ccnxHandle)
- , m_dataCallback (dataCallback)
- { }
-
- /**
- * @brief Set data callback
- * @param dataCallback the callback function to process data
- */
- void
- setDataCallback(CcnxWrapper::DataCallback dataCallback)
- {
- m_dataCallback = dataCallback;
- }
-
- /**
- * @brief Fired from SyncLogic when new data is available
- *
- * @param prefix the prefix for the data
- * @param newSeq old session ID/sequence number
- * @param oldSeq new session ID/sequence number
- */
- void
- onUpdate (const std::string &prefix, const SeqNo &newSeq, const SeqNo &oldSeq);
-
- /**
- * @brief Fired from SyncLogic when data is removed
- *
- * @param prefix the prefix for the data
- */
- void
- onRemove (const std::string &prefix);
-
-private:
- CcnxWrapperPtr m_ccnxHandle;
- CcnxWrapper::DataCallback m_dataCallback;
-};
-
-
-} // Sync
-
-#endif // SYNC_APP_DATA_FETCH_H
diff --git a/ccnx/sync-app-data-publish.cc b/ccnx/sync-app-data-publish.cc
deleted file mode 100644
index e7bee97..0000000
--- a/ccnx/sync-app-data-publish.cc
+++ /dev/null
@@ -1,81 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#include "sync-app-data-publish.h"
-#include <boost/throw_exception.hpp>
-typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
-typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
-
-using namespace std;
-using namespace boost;
-
-namespace Sync
-{
-
-
-string
-AppDataPublish::getRecentData (const string &prefix, uint32_t session)
-{
- if (m_recentData.find(make_pair(prefix, session)) != m_recentData.end())
- return m_recentData[make_pair(prefix, session)];
- else
- return "";
-}
-
-uint32_t
-AppDataPublish::getNextSeq (const string &prefix, uint32_t session)
-{
- SequenceLog::iterator i = m_sequenceLog.find (prefix);
-
- if (i != m_sequenceLog.end ())
- {
- Seq s = i->second;
- if (s.session == session)
- return s.seq;
- }
- return 0;
-}
-
-uint32_t
-AppDataPublish::publishData (const string &name, uint32_t session, const string &dataBuffer, int freshness)
-{
- uint32_t seq = getNextSeq (name, session);
-
- ostringstream contentNameWithSeqno;
- contentNameWithSeqno << name << "/" << session << "/" << seq;
-
- m_ccnxHandle->publishData (contentNameWithSeqno.str (), dataBuffer, freshness);
-
- Seq s;
- s.session = session;
- s.seq = seq + 1;
- m_sequenceLog[name] = s;
-
- unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
- if (it != m_recentData.end())
- m_recentData.erase(it);
- m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
-
- return seq;
-}
-
-} // Sync
diff --git a/ccnx/sync-app-data-publish.h b/ccnx/sync-app-data-publish.h
deleted file mode 100644
index ec95a60..0000000
--- a/ccnx/sync-app-data-publish.h
+++ /dev/null
@@ -1,97 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012 University of California, Los Angeles
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License version 2 as
- * published by the Free Software Foundation;
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
- *
- * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Chaoyi Bian <bcy@pku.edu.cn>
- * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
- */
-
-#ifndef SYNC_APP_DATA_PUBLISH_H
-#define SYNC_APP_DATA_PUBLISH_H
-
-#include <boost/unordered_map.hpp>
-#include "sync-seq-no.h"
-#include "sync-ccnx-wrapper.h"
-#include <utility>
-#include <map>
-
-namespace Sync {
-
-struct Seq
-{
- uint32_t session;
- uint32_t seq;
-};
-
-struct GetSeqException : virtual boost::exception, virtual std::exception { };
-
-/**
- * \ingroup sync
- * @brief publishes application data using incrementing sequence number (for
- * each sequence namber and keeps track of most recently published data for
- * each name prefix
- */
-class AppDataPublish
-{
-public:
- AppDataPublish (CcnxWrapperPtr ccnxHandle)
- : m_ccnxHandle (ccnxHandle)
- { }
-
- /**
- * @brief get the name (including sequence number) and the content
- * (unencoded, just XML stanza) of the most recent published data
- *
- * @param prefix the name prefix to look for
- * @param session session
- * @return the pair of name and content
- */
- std::string
- getRecentData (const std::string &prefix, uint32_t session);
-
- /**
- * @brief get the most recent sequence number for a name prefix
- * @param prefix the name prefix to look for
- * @param session session
- */
- uint32_t
- getNextSeq (const std::string &prefix, uint32_t session);
-
- /**
- * @brief publish data for a name prefix, updates the corresponding
- * sequence number and recent data
- *
- * @param name data prefix
- * @param session session to which data is published
- * @param dataBuffer the data itself
- * @param freshness the freshness for the data object
- * @return published sequence number, will throw an exception if something wrong happened
- */
- uint32_t publishData (const std::string &name, uint32_t session, const std::string &dataBuffer, int freshness);
-
-private:
- typedef boost::unordered_map<std::string, Seq> SequenceLog;
- typedef boost::unordered_map<std::pair<std::string, uint32_t>, std::string> RecentData;
-
- CcnxWrapperPtr m_ccnxHandle;
- SequenceLog m_sequenceLog;
- RecentData m_recentData;
-};
-
-} // Sync
-
-#endif // SYNC_APP_DATA_PUBLISH_H
diff --git a/ccnx/sync-app-socket.cc b/ccnx/sync-app-socket.cc
index 848a6d0..ce65674 100644
--- a/ccnx/sync-app-socket.cc
+++ b/ccnx/sync-app-socket.cc
@@ -28,13 +28,12 @@
namespace Sync
{
-SyncAppSocket::SyncAppSocket (const string &syncPrefix, CcnxWrapper::DataCallback dataCallback)
- : m_appHandle (new CcnxWrapper())
- , m_fetcher (m_appHandle, dataCallback)
- , m_publisher (m_appHandle)
+SyncAppSocket::SyncAppSocket (const string &syncPrefix, NewDataCallback dataCallback, RemoveCallback rmCallback )
+ : m_ccnxHandle (new CcnxWrapper())
+ , m_newDataCallback(dataCallback)
, m_syncLogic (syncPrefix,
- bind (&AppDataFetch::onUpdate, m_fetcher, _1, _2, _3),
- bind (&AppDataFetch::onRemove, m_fetcher, _1))
+ bind(&SyncAppSocket::passCallback, this, _1),
+ rmCallback)
{
}
@@ -42,10 +41,62 @@
{
}
-bool SyncAppSocket::publish (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
+bool
+SyncAppSocket::publishString (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
{
- uint32_t sequence = m_publisher.publishData (prefix, session, dataBuffer, freshness);
+ uint32_t sequence = getNextSeq(prefix, session);
+ ostringstream contentNameWithSeqno;
+ contentNameWithSeqno << prefix << "/" << session << "/" << sequence;
+ m_ccnxHandle->publishStringData (contentNameWithSeqno.str (), dataBuffer, freshness);
+
+ SeqNo s(session, sequence + 1);
+ m_sequenceLog[prefix] = s;
+
m_syncLogic.addLocalNames (prefix, session, sequence);
}
+bool
+SyncAppSocket::publishRaw(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness)
+{
+ uint32_t sequence = getNextSeq(prefix, session);
+ ostringstream contentNameWithSeqno;
+ contentNameWithSeqno << prefix << "/" << session << "/" << sequence;
+ m_ccnxHandle->publishRawData (contentNameWithSeqno.str (), buf, len, freshness);
+
+ SeqNo s(session, sequence + 1);
+ m_sequenceLog[prefix] = s;
+ m_syncLogic.addLocalNames (prefix, session, sequence);
+}
+
+
+void
+SyncAppSocket::fetchString(const std::string &prefix, const SeqNo &seq, CcnxWrapper::StringDataCallback callback, int retry)
+{
+ ostringstream interestName;
+ interestName << prefix << "/" << seq.getSession() << "/" << seq.getSeq();
+ m_ccnxHandle->sendInterestForString(interestName.str(), callback, retry);
+}
+
+void
+SyncAppSocket::fetchRaw(const std::string &prefix, const SeqNo &seq, CcnxWrapper::RawDataCallback callback, int retry)
+{
+ ostringstream interestName;
+ interestName << prefix << "/" << seq.getSession() << "/" << seq.getSeq();
+ m_ccnxHandle->sendInterest(interestName.str(), callback, retry);
+}
+
+uint32_t
+SyncAppSocket::getNextSeq (const string &prefix, uint32_t session)
+{
+ SequenceLog::iterator i = m_sequenceLog.find (prefix);
+
+ if (i != m_sequenceLog.end ())
+ {
+ SeqNo s = i->second;
+ if (s.getSession() == session)
+ return s.getSeq();
+ }
+ return 0;
+}
+
}
diff --git a/ccnx/sync-app-socket.h b/ccnx/sync-app-socket.h
index f5e5e0f..07c0aa0 100644
--- a/ccnx/sync-app-socket.h
+++ b/ccnx/sync-app-socket.h
@@ -24,9 +24,14 @@
#define SYNC_APP_SOCKET_H
#include "sync-logic.h"
-#include "sync-app-data-fetch.h"
-#include "sync-app-data-publish.h"
#include <boost/function.hpp>
+#include <boost/unordered_map.hpp>
+#include "sync-seq-no.h"
+#include "sync-ccnx-wrapper.h"
+#include <utility>
+#include <map>
+#include <vector>
+#include <sstream>
namespace Sync {
@@ -37,6 +42,8 @@
class SyncAppSocket
{
public:
+ typedef boost::function< void (const std::vector<MissingDataInfo> &, SyncAppSocket * ) > NewDataCallback;
+ typedef boost::function< void ( const std::string &/*prefix*/ ) > RemoveCallback;
/**
* @brief the constructor for SyncAppSocket; the parameter syncPrefix
* should be passed to the constructor of m_syncAppWrapper; the other
@@ -48,7 +55,7 @@
* @param syncPrefix the name prefix for Sync Interest
* @param dataCallback the callback to process data
*/
- SyncAppSocket (const std::string &syncPrefix, CcnxWrapper::DataCallback dataCallback);
+ SyncAppSocket (const std::string &syncPrefix, NewDataCallback dataCallback, RemoveCallback rmCallback);
~SyncAppSocket ();
/**
@@ -60,7 +67,9 @@
* @param dataBuffer the data itself
* @param freshness the freshness time for the data (in seconds)
*/
- bool publish (const std::string &prefix, uint32_t session, const std::string &dataBuffer, int freshness);
+ bool publishString (const std::string &prefix, uint32_t session, const std::string &dataBuffer, int freshness);
+
+ bool publishRaw(const std::string &prefix, uint32_t session, const char *buf, size_t len, int freshness);
/**
* @brief delete a participant's subtree from the sync tree; SyncLogic will do the work
@@ -70,13 +79,20 @@
*/
void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
- int getSeq();
+ void fetchString(const std::string &prefix, const SeqNo &seq, CcnxWrapper::StringDataCallback callback, int retry = 0);
+ void fetchRaw(const std::string &prefix, const SeqNo &seq, CcnxWrapper::RawDataCallback callback, int retry = 0);
+
+ void passCallback(std::vector<MissingDataInfo> &v) {m_newDataCallback(v, this);}
private:
- CcnxWrapperPtr m_appHandle;
+ uint32_t
+ getNextSeq (const std::string &prefix, uint32_t session);
- AppDataFetch m_fetcher;
- AppDataPublish m_publisher;
+private:
+ typedef boost::unordered_map<std::string, SeqNo> SequenceLog;
+ NewDataCallback m_newDataCallback;
+ SequenceLog m_sequenceLog;
+ CcnxWrapperPtr m_ccnxHandle;
SyncLogic m_syncLogic;
};
diff --git a/ccnx/sync-ccnx-wrapper.cc b/ccnx/sync-ccnx-wrapper.cc
index 85ffbd6..559cfad 100644
--- a/ccnx/sync-ccnx-wrapper.cc
+++ b/ccnx/sync-ccnx-wrapper.cc
@@ -158,9 +158,13 @@
}
/// @endcond
+int
+CcnxWrapper::publishStringData (const string &name, const string &dataBuffer, int freshness) {
+ publishRawData(name, dataBuffer.c_str(), dataBuffer.length(), freshness);
+}
int
-CcnxWrapper::publishData (const string &name, const string &dataBuffer, int freshness)
+CcnxWrapper::publishRawData (const string &name, const char *buf, size_t len, int freshness)
{
recursive_mutex::scoped_lock lock(m_mutex);
if (!m_running)
@@ -181,7 +185,7 @@
NULL,
m_keyLoactor);
if(ccn_encode_ContentObject(content, pname, signed_info,
- dataBuffer.c_str(), dataBuffer.length (),
+ (const unsigned char *)buf, len,
NULL, getPrivateKey()) < 0)
BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
@@ -235,7 +239,6 @@
ccn_upcall_kind kind,
ccn_upcall_info *info)
{
- //CcnxWrapper::DataCallback *f = static_cast<CcnxWrapper::DataCallback*> (selfp->data);
ClosurePass *cp = static_cast<ClosurePass *> (selfp->data);
switch (kind)
@@ -282,13 +285,13 @@
return CCN_UPCALL_RESULT_OK;
}
-int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback, int retry)
+int CcnxWrapper::sendInterestForString (const string &strInterest, const StringDataCallback &strDataCallback, int retry)
{
- DataClosurePass * pass = new DataClosurePass(STRING_FORM, retry, dataCallback);
+ DataClosurePass * pass = new DataClosurePass(STRING_FORM, retry, strDataCallback);
sendInterest(strInterest, pass);
}
-int CcnxWrapper::sendInterestForRawData (const string &strInterest, const RawDataCallback &rawDataCallback, int retry)
+int CcnxWrapper::sendInterest (const string &strInterest, const RawDataCallback &rawDataCallback, int retry)
{
RawDataClosurePass * pass = new RawDataClosurePass(RAW_DATA, retry, rawDataCallback);
sendInterest(strInterest, pass);
@@ -356,9 +359,9 @@
ccn_charbuf_destroy(&pname);
}
-DataClosurePass::DataClosurePass (CallbackType type, int retry, const CcnxWrapper::DataCallback &dataCallback): ClosurePass(type, retry), m_callback(NULL)
+DataClosurePass::DataClosurePass (CallbackType type, int retry, const CcnxWrapper::StringDataCallback &strDataCallback): ClosurePass(type, retry), m_callback(NULL)
{
- m_callback = new CcnxWrapper::DataCallback (dataCallback);
+ m_callback = new CcnxWrapper::StringDataCallback (strDataCallback);
}
DataClosurePass::~DataClosurePass ()
diff --git a/ccnx/sync-ccnx-wrapper.h b/ccnx/sync-ccnx-wrapper.h
index b8c3c93..b51c629 100644
--- a/ccnx/sync-ccnx-wrapper.h
+++ b/ccnx/sync-ccnx-wrapper.h
@@ -37,6 +37,7 @@
#include <boost/thread/thread.hpp>
#include <boost/function.hpp>
#include <string>
+#include <sstream>
/**
* \defgroup sync SYNC protocol
@@ -53,7 +54,7 @@
*/
class CcnxWrapper {
public:
- typedef boost::function<void (std::string, std::string)> DataCallback;
+ typedef boost::function<void (std::string, std::string)> StringDataCallback;
typedef boost::function<void (std::string, const char *buf, size_t len)> RawDataCallback;
typedef boost::function<void (std::string)> InterestCallback;
@@ -78,10 +79,10 @@
* @return the return code of ccn_express_interest
*/
int
- sendInterest (const std::string &strInterest, const DataCallback &dataCallback, int retry = 0);
+ sendInterestForString (const std::string &strInterest, const StringDataCallback &strDataCallback, int retry = 0);
int
- sendInterestForRawData (const std::string &strInterest, const RawDataCallback &rawDataCallback, int retry = 0);
+ sendInterest (const std::string &strInterest, const RawDataCallback &rawDataCallback, int retry = 0);
/**
* @brief set Interest filter (specify what interest you want to receive)
@@ -110,7 +111,10 @@
* @return code generated by ccnx library calls, >0 if success
*/
int
- publishData (const std::string &name, const std::string &dataBuffer, int freshness);
+ publishStringData (const std::string &name, const std::string &dataBuffer, int freshness);
+
+ int
+ publishRawData (const std::string &name, const char *buf, size_t len, int freshness);
private:
/// @cond include_hidden
@@ -165,11 +169,11 @@
class DataClosurePass: public ClosurePass {
public:
- DataClosurePass(CallbackType type, int retry, const CcnxWrapper::DataCallback &dataCallback);
+ DataClosurePass(CallbackType type, int retry, const CcnxWrapper::StringDataCallback &strDataCallback);
virtual ~DataClosurePass();
virtual void runCallback(std::string name, const char *, size_t len);
private:
- CcnxWrapper::DataCallback * m_callback;
+ CcnxWrapper::StringDataCallback * m_callback;
};
class RawDataClosurePass: public ClosurePass {