Moving some logic of dealing with sequence numbers to SyncAppDataFetch
Adding recursive mutexes to SyncLogic, so hopefully it is now thread-safe
Small modification of publishing/retrieval names. Now session ID is
appended by SyncAppDataFetch / SyncAppDataPublish
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 6b11c9a..3102c4e 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -38,11 +38,13 @@
const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
-SyncLogic::SyncLogic (const string &syncPrefix,
- LogicCallback fetchCallback,
+SyncLogic::SyncLogic (const std::string &syncPrefix,
+ LogicUpdateCallback onUpdate,
+ LogicRemoveCallback onRemove,
CcnxWrapperPtr ccnxHandle)
: m_syncPrefix (syncPrefix)
- , m_fetchCallback (fetchCallback)
+ , m_onUpdate (onUpdate)
+ , m_onRemove (onRemove)
, m_ccnxHandle (ccnxHandle)
, m_delayedCheckThreadRunning (true)
{
@@ -128,6 +130,7 @@
SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
{
// cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+ recursive_mutex::scoped_lock lock (m_stateMutex);
if (*m_state.getDigest() == *digest)
{
@@ -172,132 +175,106 @@
void
SyncLogic::processSyncData (const string &name, const string &dataBuffer)
{
- string last = name.substr(name.find_last_of("/") + 1);
- istringstream ss (dataBuffer);
-
- DiffStatePtr diffLog = make_shared<DiffState>();
-
- if (last == "state")
- {
- FullState full;
- ss >> full;
- BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves().get<ordered>())
- {
- NameInfoConstPtr info = leaf->getInfo ();
- LeafContainer::iterator it = m_state.getLeaves().find (info);
-
- SeqNo seq = leaf->getSeq ();
-
- // if (it == m_state.getLeaves().end())
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
- // m_fetchCallback (prefix, 1, seq.getSeq());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // else
- // {
- // SeqNo currSeq = (*it)->getSeq();
- // if (currSeq < seq)
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
-
- // if (currSeq.getSession() == seq.getSession())
- // m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
- // else
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // }
- }
- }
- else
- {
- DiffState diff;
- ss >> diff;
- BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
- {
- shared_ptr<const DiffLeaf> diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
- if (diffLeaf == 0)
- {
- return;
- /// \todo Log the error
- }
- NameInfoConstPtr info = diffLeaf->getInfo();
- LeafContainer::iterator it = m_state.getLeaves().find (info);
- SeqNo seq = diffLeaf->getSeq();
-
- // switch (diffLeaf->getOperation())
- // {
- // case UPDATE:
- // if (it == m_state.getLeaves().end())
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // else
- // {
- // SeqNo currSeq = (*it)->getSeq();
- // if (currSeq < seq)
- // {
- // string prefix = info.toString();
- // prefix += "/";
- // prefix += seq.getSession();
-
- // if (currSeq.getSession() == seq.getSession())
- // m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
- // else
- // m_fetchCallback(prefix, 1, seq.getSeq());
-
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.update(pInfo, seq);
- // diffLog->update(pInfo, seq);
- // }
- // }
- // break;
-
- // case REMOVE:
- // if (it != m_state.getLeaves().end())
- // {
- // NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
- // m_state.remove(pInfo);
- // diffLog->remove(pInfo);
- // }
- // break;
-
- // default:
- // break;
- // }
- }
- }
-
- diffLog->setDigest(m_state.getDigest());
- m_log.insert (diffLog);
-
- // notify upper layer
- BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
- {
- }
+ DiffStatePtr diffLog = make_shared<DiffState> ();
- sendSyncInterest();
+ try
+ {
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
+ string last = name.substr(name.find_last_of("/") + 1);
+ istringstream ss (dataBuffer);
+
+ if (last == "state")
+ {
+ FullState full;
+ ss >> full;
+ BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves()) // order doesn't matter
+ {
+ NameInfoConstPtr info = leaf->getInfo ();
+ SeqNo seq = leaf->getSeq ();
+
+ bool inserted = false;
+ bool updated = false;
+ SeqNo oldSeq;
+ tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+
+ if (updated)
+ {
+ diffLog->update (info, seq);
+ m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+ }
+ }
+ }
+ else
+ {
+ DiffState diff;
+ ss >> diff;
+ BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
+ {
+ DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+ BOOST_ASSERT (diffLeaf != 0);
+
+ NameInfoConstPtr info = diffLeaf->getInfo ();
+ if (diffLeaf->getOperation() == UPDATE)
+ {
+ SeqNo seq = diffLeaf->getSeq ();
+
+ bool inserted = false;
+ bool updated = false;
+ SeqNo oldSeq;
+ tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+
+ if (updated)
+ {
+ diffLog->update (info, seq);
+ m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+ }
+ }
+ else if (diffLeaf->getOperation() == REMOVE)
+ {
+ if (m_state.remove (info))
+ {
+ diffLog->remove (info);
+ m_onRemove (info->toString ());
+ }
+ }
+ else
+ {
+ BOOST_ASSERT (false); // just in case
+ }
+ }
+ }
+
+ diffLog->setDigest(m_state.getDigest());
+ m_log.insert (diffLog);
+ }
+ catch (Error::SyncXmlDecodingFailure &e)
+ {
+ // log error
+ return;
+ }
+
+ if (diffLog->getLeaves ().size () > 0)
+ {
+ // notify upper layer
+ BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
+ {
+ DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+ BOOST_ASSERT (diffLeaf != 0);
+
+ // m_fetchCallback (prefix, 1, seq.getSeq());
+ }
+
+ sendSyncInterest ();
+ }
}
void
SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
{
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
+ NameInfoConstPtr info = StdNameInfo::FindOrCreate (prefix);
SeqNo seqN(session, seq);
DiffStatePtr diff = make_shared<DiffState>();
diff->update(info, seqN);
@@ -317,6 +294,8 @@
void
SyncLogic::sendSyncInterest ()
{
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+
ostringstream os;
os << m_syncPrefix << "/" << m_state.getDigest();