Moving some logic of dealing with sequence numbers to SyncAppDataFetch
Adding recursive mutexes to SyncLogic, so hopefully it is now thread-safe
Small modification of publishing/retrieval names. Now session ID is
appended by SyncAppDataFetch / SyncAppDataPublish
diff --git a/model/sync-app-data-fetch.cc b/model/sync-app-data-fetch.cc
index 43e089b..4cf22a3 100644
--- a/model/sync-app-data-fetch.cc
+++ b/model/sync-app-data-fetch.cc
@@ -29,16 +29,32 @@
{
void
-AppDataFetch::fetch (const string &prefix, uint32_t startSeq, uint32_t endSeq)
+AppDataFetch::onUpdate (const std::string &prefix, const SeqNo &newSeq, const SeqNo &oldSeq)
{
- for (uint32_t i = startSeq; i <= endSeq; i++)
- {
- if (i == 0)
- continue;
- ostringstream interestName;
- interestName << prefix << "/" << i;
- m_ccnxHandle->sendInterest (interestName.str (), m_dataCallback);
- }
+ // sequence number logic here
+ uint32_t start = 0;
+ if (oldSeq.isValid ())
+ {
+ start = oldSeq.getSeq () + 1;
+ }
+ uint32_t end = newSeq.getSeq ();
+
+ //
+ // add logic for wrap around
+ //
+
+ for (uint32_t i = start; i <= end; i++)
+ {
+ ostringstream interestName;
+ interestName << prefix << "/" << newSeq.getSession () << "/" << i;
+ m_ccnxHandle->sendInterest (interestName.str (), m_dataCallback);
+ }
+}
+
+void
+AppDataFetch::onRemove (const std::string &prefix)
+{
+ // I guess this should be somewhere in app
}
}
diff --git a/model/sync-app-data-fetch.h b/model/sync-app-data-fetch.h
index 3f368cb..1037807 100644
--- a/model/sync-app-data-fetch.h
+++ b/model/sync-app-data-fetch.h
@@ -24,6 +24,7 @@
#define SYNC_APP_DATA_FETCH_H
#include "sync-ccnx-wrapper.h"
+#include "sync-seq-no.h"
namespace Sync {
@@ -57,15 +58,23 @@
}
/**
- * @brief Fetch data for a certain name prefix
+ * @brief Fired from SyncLogic when new data is available
*
* @param prefix the prefix for the data
- * @param startSeq the start of sequence number range (inclusive)
- * @param endSeq the end of sequence number range (inclusive)
+ * @param newSeq old session ID/sequence number
+ * @param oldSeq new session ID/sequence number
*/
void
- fetch (const std::string &prefix, uint32_t startSeq, uint32_t endSeq);
+ onUpdate (const std::string &prefix, const SeqNo &newSeq, const SeqNo &oldSeq);
+ /**
+ * @brief Fired from SyncLogic when data is removed
+ *
+ * @param prefix the prefix for the data
+ */
+ void
+ onRemove (const std::string &prefix);
+
private:
CcnxWrapperPtr m_ccnxHandle;
CcnxWrapper::DataCallback m_dataCallback;
diff --git a/model/sync-app-data-publish.cc b/model/sync-app-data-publish.cc
index 66ca1ae..17d52ee 100644
--- a/model/sync-app-data-publish.cc
+++ b/model/sync-app-data-publish.cc
@@ -31,10 +31,10 @@
string
AppDataPublish::getRecentData (const string &prefix, uint32_t session)
{
- if (m_recentData.find(make_pair(prefix, session)) != m_recentData.end())
- return m_recentData[make_pair(prefix, session)];
- else
- return "";
+ if (m_recentData.find(make_pair(prefix, session)) != m_recentData.end())
+ return m_recentData[make_pair(prefix, session)];
+ else
+ return "";
}
uint32_t
@@ -68,14 +68,14 @@
m_sequenceLog[name] = s;
ostringstream contentNameWithSeqno;
- contentNameWithSeqno << name << "/" << seq;
+ contentNameWithSeqno << name << "/" << session << "/" << seq;
m_ccnxHandle->publishData (contentNameWithSeqno.str (), dataBuffer, freshness);
unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
- if (it != m_recentData.end())
- m_recentData.erase(it);
- m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
+ if (it != m_recentData.end())
+ m_recentData.erase(it);
+ m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
return true;
}
diff --git a/model/sync-app-socket.cc b/model/sync-app-socket.cc
index d703e76..3dca250 100644
--- a/model/sync-app-socket.cc
+++ b/model/sync-app-socket.cc
@@ -33,7 +33,8 @@
, m_fetcher (m_ccnxHandle, dataCallback)
, m_publisher (m_ccnxHandle)
, m_syncLogic (syncPrefix,
- bind (&AppDataFetch::fetch, m_fetcher, _1, _2, _3),
+ bind (&AppDataFetch::onUpdate, m_fetcher, _1, _2, _3),
+ bind (&AppDataFetch::onRemove, m_fetcher, _1),
m_ccnxHandle)
{
}
diff --git a/model/sync-diff-leaf.h b/model/sync-diff-leaf.h
index 1e3143a..c4aab82 100644
--- a/model/sync-diff-leaf.h
+++ b/model/sync-diff-leaf.h
@@ -74,6 +74,7 @@
};
typedef boost::shared_ptr<DiffLeaf> DiffLeafPtr;
+typedef boost::shared_ptr<const DiffLeaf> DiffLeafConstPtr;
std::ostream &
operator << (std::ostream &os, Operation op);
diff --git a/model/sync-diff-state.cc b/model/sync-diff-state.cc
index 65323d3..a804a8e 100644
--- a/model/sync-diff-state.cc
+++ b/model/sync-diff-state.cc
@@ -61,7 +61,7 @@
}
// from State
-bool
+boost::tuple<bool/*inserted*/, bool/*updated*/, SeqNo/*oldSeqNo*/>
DiffState::update (NameInfoConstPtr info, const SeqNo &seq)
{
m_leaves.erase (info);
@@ -69,7 +69,7 @@
DiffLeafPtr leaf = make_shared<DiffLeaf> (info, cref (seq));
m_leaves.insert (leaf);
- return true;
+ return make_tuple (true, false, SeqNo ());
}
bool
diff --git a/model/sync-diff-state.h b/model/sync-diff-state.h
index 6a43aef..aed6b32 100644
--- a/model/sync-diff-state.h
+++ b/model/sync-diff-state.h
@@ -86,7 +86,7 @@
operator += (const DiffState &state);
// from State
- virtual bool
+ virtual boost::tuple<bool/*inserted*/, bool/*updated*/, SeqNo/*oldSeqNo*/>
update (NameInfoConstPtr info, const SeqNo &seq);
virtual bool
diff --git a/model/sync-full-state.cc b/model/sync-full-state.cc
index ed3a3ee..748e408 100644
--- a/model/sync-full-state.cc
+++ b/model/sync-full-state.cc
@@ -81,7 +81,7 @@
}
// from State
-bool
+boost::tuple<bool/*inserted*/, bool/*updated*/, SeqNo/*oldSeqNo*/>
FullState::update (NameInfoConstPtr info, const SeqNo &seq)
{
#ifndef STANDALONE
@@ -96,16 +96,20 @@
if (item == m_leaves.end ())
{
m_leaves.insert (make_shared<FullLeaf> (info, cref (seq)));
+ return make_tuple (true, false, SeqNo ());
}
else
{
if ((*item)->getSeq () == seq || seq < (*item)->getSeq ())
- return false;
-
+ {
+ return make_tuple (false, false, SeqNo ());
+ }
+
+ SeqNo old = (*item)->getSeq ();
m_leaves.modify (item,
ll::bind (&Leaf::setSeq, *ll::_1, seq));
+ return make_tuple (false, true, old);
}
- return true;
}
bool
@@ -122,7 +126,7 @@
LeafContainer::iterator item = m_leaves.find (info);
if (item != m_leaves.end ())
{
- m_leaves.erase (info);
+ m_leaves.erase (item);
return true;
}
else
diff --git a/model/sync-full-state.h b/model/sync-full-state.h
index 2aac7b9..5b80aff 100644
--- a/model/sync-full-state.h
+++ b/model/sync-full-state.h
@@ -67,7 +67,7 @@
getDigest ();
// from State
- virtual bool
+ virtual boost::tuple<bool/*inserted*/, bool/*updated*/, SeqNo/*oldSeqNo*/>
update (NameInfoConstPtr info, const SeqNo &seq);
virtual bool
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 6b11c9a..3102c4e 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -38,11 +38,13 @@
const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
-SyncLogic::SyncLogic (const string &syncPrefix,
- LogicCallback fetchCallback,
+SyncLogic::SyncLogic (const std::string &syncPrefix,
+ LogicUpdateCallback onUpdate,
+ LogicRemoveCallback onRemove,
CcnxWrapperPtr ccnxHandle)
: m_syncPrefix (syncPrefix)
- , m_fetchCallback (fetchCallback)
+ , m_onUpdate (onUpdate)
+ , m_onRemove (onRemove)
, m_ccnxHandle (ccnxHandle)
, m_delayedCheckThreadRunning (true)
{
@@ -128,6 +130,7 @@
SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
{
// cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+ recursive_mutex::scoped_lock lock (m_stateMutex);
if (*m_state.getDigest() == *digest)
{
@@ -172,132 +175,106 @@
void
SyncLogic::processSyncData (const string &name, const string &dataBuffer)
{
- string last = name.substr(name.find_last_of("/") + 1);
- istringstream ss (dataBuffer);
-
- DiffStatePtr diffLog = make_shared<DiffState>();
-
- if (last == "state")
- {
- FullState full;
- ss >> full;
- BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves().get<ordered>())
- {
- NameInfoConstPtr info = leaf->getInfo ();
- LeafContainer::iterator it = m_state.getLeaves().find (info);
-
- SeqNo seq = leaf->getSeq ();
-
- // if (it == m_state.getLeaves().end())
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
- // m_fetchCallback (prefix, 1, seq.getSeq());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // else
- // {
- // SeqNo currSeq = (*it)->getSeq();
- // if (currSeq < seq)
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
-
- // if (currSeq.getSession() == seq.getSession())
- // m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
- // else
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // }
- }
- }
- else
- {
- DiffState diff;
- ss >> diff;
- BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
- {
- shared_ptr<const DiffLeaf> diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
- if (diffLeaf == 0)
- {
- return;
- /// \todo Log the error
- }
- NameInfoConstPtr info = diffLeaf->getInfo();
- LeafContainer::iterator it = m_state.getLeaves().find (info);
- SeqNo seq = diffLeaf->getSeq();
-
- // switch (diffLeaf->getOperation())
- // {
- // case UPDATE:
- // if (it == m_state.getLeaves().end())
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // else
- // {
- // SeqNo currSeq = (*it)->getSeq();
- // if (currSeq < seq)
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
-
- // if (currSeq.getSession() == seq.getSession())
- // m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
- // else
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // }
- // break;
-
- // case REMOVE:
- // if (it != m_state.getLeaves().end())
- // {
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.remove(pInfo);
- // diffLog->remove(pInfo);
- // }
- // break;
-
- // default:
- // break;
- // }
- }
- }
-
- diffLog->setDigest(m_state.getDigest());
- m_log.insert (diffLog);
-
- // notify upper layer
- BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
- {
- }
+ DiffStatePtr diffLog = make_shared<DiffState> ();
- sendSyncInterest();
+ try
+ {
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
+ string last = name.substr(name.find_last_of("/") + 1);
+ istringstream ss (dataBuffer);
+
+ if (last == "state")
+ {
+ FullState full;
+ ss >> full;
+ BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves()) // order doesn't matter
+ {
+ NameInfoConstPtr info = leaf->getInfo ();
+ SeqNo seq = leaf->getSeq ();
+
+ bool inserted = false;
+ bool updated = false;
+ SeqNo oldSeq;
+ tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+
+ if (updated)
+ {
+ diffLog->update (info, seq);
+ m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+ }
+ }
+ }
+ else
+ {
+ DiffState diff;
+ ss >> diff;
+ BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
+ {
+ DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+ BOOST_ASSERT (diffLeaf != 0);
+
+ NameInfoConstPtr info = diffLeaf->getInfo ();
+ if (diffLeaf->getOperation() == UPDATE)
+ {
+ SeqNo seq = diffLeaf->getSeq ();
+
+ bool inserted = false;
+ bool updated = false;
+ SeqNo oldSeq;
+ tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+
+ if (updated)
+ {
+ diffLog->update (info, seq);
+ m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+ }
+ }
+ else if (diffLeaf->getOperation() == REMOVE)
+ {
+ if (m_state.remove (info))
+ {
+ diffLog->remove (info);
+ m_onRemove (info->toString ());
+ }
+ }
+ else
+ {
+ BOOST_ASSERT (false); // just in case
+ }
+ }
+ }
+
+ diffLog->setDigest(m_state.getDigest());
+ m_log.insert (diffLog);
+ }
+ catch (Error::SyncXmlDecodingFailure &e)
+ {
+ // log error
+ return;
+ }
+
+ if (diffLog->getLeaves ().size () > 0)
+ {
+ // notify upper layer
+ BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
+ {
+ DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+ BOOST_ASSERT (diffLeaf != 0);
+
+ // m_fetchCallback (prefix, 1, seq.getSeq());
+ }
+
+ sendSyncInterest ();
+ }
}
void
SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
{
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
+ NameInfoConstPtr info = StdNameInfo::FindOrCreate (prefix);
SeqNo seqN(session, seq);
DiffStatePtr diff = make_shared<DiffState>();
diff->update(info, seqN);
@@ -317,6 +294,8 @@
void
SyncLogic::sendSyncInterest ()
{
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
ostringstream os;
os << m_syncPrefix << "/" << m_state.getDigest();
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 42a2ae0..d2d9f0f 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -25,6 +25,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include "boost/date_time/posix_time/posix_time_types.hpp"
+#include <boost/thread/recursive_mutex.hpp>
#include "sync-ccnx-wrapper.h"
#include "sync-interest-table.h"
@@ -44,7 +45,8 @@
class SyncLogic
{
public:
- typedef boost::function<void (const std::string &, uint32_t, uint32_t)> LogicCallback;
+ typedef boost::function< void ( const std::string &/*prefix*/, const SeqNo &/*newSeq*/, const SeqNo &/*oldSeq*/ ) > LogicUpdateCallback;
+ typedef boost::function< void ( const std::string &/*prefix*/ ) > LogicRemoveCallback;
/**
* @brief Constructor
@@ -53,7 +55,10 @@
* @param ccnxHandle ccnx handle
* the app data when new remote names are learned
*/
- SyncLogic (const std::string &syncPrefix, LogicCallback fetch, CcnxWrapperPtr ccnxHandle);
+ SyncLogic (const std::string &syncPrefix,
+ LogicUpdateCallback onUpdate,
+ LogicRemoveCallback onRemove,
+ CcnxWrapperPtr ccnxHandle);
~SyncLogic ();
@@ -98,10 +103,13 @@
FullState m_state;
DiffStateContainer m_log;
+ boost::recursive_mutex m_stateMutex;
+
SyncInterestTable m_syncInterestTable;
std::string m_syncPrefix;
- LogicCallback m_fetchCallback;
+ LogicUpdateCallback m_onUpdate;
+ LogicRemoveCallback m_onRemove;
CcnxWrapperPtr m_ccnxHandle;
boost::thread m_delayedCheckThread;
diff --git a/model/sync-seq-no.h b/model/sync-seq-no.h
index 93013a5..b4329d2 100644
--- a/model/sync-seq-no.h
+++ b/model/sync-seq-no.h
@@ -36,6 +36,16 @@
{
public:
/**
+ * @brief Default constructor. Creates an zero sequence number with zero session ID (basically is an invalid object)
+ */
+ SeqNo ()
+ : m_valid (false)
+ , m_session (0)
+ , m_seq (0)
+ {
+ }
+
+ /**
* @brief Copy constructor
* @param seq sequence number object to copy from
*/
@@ -51,6 +61,7 @@
SeqNo &
operator = (const SeqNo &seq)
{
+ m_valid = seq.m_valid;
m_session = seq.m_session;
m_seq = seq.m_seq;
@@ -62,7 +73,8 @@
* @param seq Sequence number
*/
SeqNo (uint32_t seq)
- : m_session (0)
+ : m_valid (true)
+ , m_session (0)
, m_seq (seq)
{ }
@@ -72,7 +84,8 @@
* @param seq Sequence number
*/
SeqNo (uint32_t session, uint32_t seq)
- : m_session (session)
+ : m_valid (true)
+ , m_session (session)
, m_seq (seq)
{ }
@@ -108,6 +121,12 @@
return m_session == seq.m_session && m_seq == seq.m_seq;
}
+ bool
+ isValid () const
+ {
+ return m_valid;
+ }
+
/**
* @brief Get session id
*/
@@ -121,6 +140,8 @@
{ return m_seq; }
private:
+ bool m_valid;
+
/**
* @brief Session ID (e.g., after crash, application will choose new session ID.
*
diff --git a/model/sync-state.h b/model/sync-state.h
index 2806fdb..f01e744 100644
--- a/model/sync-state.h
+++ b/model/sync-state.h
@@ -25,6 +25,7 @@
#include "sync-state-leaf-container.h"
#include <boost/exception/all.hpp>
+#include "boost/tuple/tuple.hpp"
/**
* \defgroup sync SYNC protocol
@@ -48,7 +49,7 @@
* @param info name of the leaf
* @param seq sequence number of the leaf
*/
- virtual bool
+ virtual boost::tuple<bool/*inserted*/, bool/*updated*/, SeqNo/*oldSeqNo*/>
update (NameInfoConstPtr info, const SeqNo &seq) = 0;
/**
diff --git a/test/test_data_fetch_and_publish.cc b/test/test_data_fetch_and_publish.cc
index b227572..a442830 100644
--- a/test/test_data_fetch_and_publish.cc
+++ b/test/test_data_fetch_and_publish.cc
@@ -71,7 +71,7 @@
string str[5] = {"panda", "express", "tastes", "so", "good"};
for (int i = 0; i < 5; i++) {
- foo.set(interest + "/" + seq[i], str[i]);
+ foo.set(interest + "/" + "0/" /*session*/ + seq[i], str[i]);
}
boost::function<void (string, string)> setFunc =
@@ -89,7 +89,7 @@
BOOST_CHECK_EQUAL(publisher.getHighestSeq(interest, 0), 5);
BOOST_CHECK_EQUAL(publisher.getRecentData(interest, 0), str[4]);
- fetcher.fetch(interest, 1, 5);
+ fetcher.onUpdate (interest, SeqNo (0,5), SeqNo (0,0));
// give time for ccnd to react
sleep(1);
BOOST_CHECK_EQUAL(foo.toString(), bar.toString());
@@ -99,7 +99,7 @@
bind(&TestStructApp::erase, &bar, _1, _2);
fetcher.setDataCallback(eraseFunc);
- fetcher.fetch(interest, 1, 5);
+ fetcher.onUpdate (interest, SeqNo (0,5), SeqNo (0,0));
// give time for ccnd to react
sleep(1);
TestStructApp poo;
diff --git a/test/test_interest_table.cc b/test/test_interest_table.cc
index 7cca0eb..3c39046 100644
--- a/test/test_interest_table.cc
+++ b/test/test_interest_table.cc
@@ -42,15 +42,20 @@
BOOST_CHECK_NO_THROW (delete table);
}
-void func (const std::string &, uint32_t, uint32_t)
+void funcUpdate (const std::string &, const SeqNo &newSeq, const SeqNo &oldSeq)
{
- cout << "func\n";
+ cout << "funcUpdate\n";
+}
+
+void funcRemove (const std::string &)
+{
+ cout << "funcRemove\n";
}
BOOST_AUTO_TEST_CASE (SyncLogicTest)
{
SyncLogic *logic = 0;
- BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", func, make_shared<CcnxWrapper> ()));
+ BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove, make_shared<CcnxWrapper> ()));
BOOST_CHECK_EQUAL (logic->getListChecksSize (), 0);
// 0s