Moving some logic of dealing with sequence numbers to SyncAppDataFetch

Adding recursive mutexes to SyncLogic, so hopefully it is now thread-safe

Small modification of publishing/retrieval names. Now session ID is
appended by SyncAppDataFetch / SyncAppDataPublish
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 6b11c9a..3102c4e 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -38,11 +38,13 @@
 const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
 
 
-SyncLogic::SyncLogic (const string &syncPrefix,
-                      LogicCallback fetchCallback,
+SyncLogic::SyncLogic (const std::string &syncPrefix,
+                      LogicUpdateCallback onUpdate,
+                      LogicRemoveCallback onRemove,
                       CcnxWrapperPtr ccnxHandle)
   : m_syncPrefix (syncPrefix)
-  , m_fetchCallback (fetchCallback)
+  , m_onUpdate (onUpdate)
+  , m_onRemove (onRemove)
   , m_ccnxHandle (ccnxHandle)
   , m_delayedCheckThreadRunning (true)
 {
@@ -128,6 +130,7 @@
 SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
 {
   // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+  recursive_mutex::scoped_lock lock (m_stateMutex);
     
   if (*m_state.getDigest() == *digest)
   {
@@ -172,132 +175,106 @@
 void
 SyncLogic::processSyncData (const string &name, const string &dataBuffer)
 {
-  string last = name.substr(name.find_last_of("/") + 1);
-  istringstream ss (dataBuffer);
-
-  DiffStatePtr diffLog = make_shared<DiffState>();
-
-  if (last == "state")
-  {
-    FullState full;
-    ss >> full;
-    BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves().get<ordered>())
-    {
-      NameInfoConstPtr info = leaf->getInfo ();
-      LeafContainer::iterator it = m_state.getLeaves().find (info);
-
-      SeqNo seq = leaf->getSeq ();
-
-      // if (it == m_state.getLeaves().end())
-      // {
-      //   string prefix = info.toString();
-      //   prefix += "/";
-      //   prefix += seq.getSession();
-      //   m_fetchCallback (prefix, 1, seq.getSeq());
-      //   m_state.update(pInfo, seq);
-      //   diffLog->update(pInfo, seq);
-      // }
-      // else
-      // {
-      //   SeqNo currSeq = (*it)->getSeq();
-      //   if (currSeq < seq)
-      //   {
-      //     string prefix = info.toString();
-      //     prefix += "/";
-      //     prefix += seq.getSession();
-
-      //     if (currSeq.getSession() == seq.getSession())
-      //       m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
-      //     else
-      //       m_fetchCallback(prefix, 1, seq.getSeq());
-
-      //     m_state.update(pInfo, seq);
-      //     diffLog->update(pInfo, seq);
-      //   }
-      // }
-    }
-  }
-  else
-  {
-    DiffState diff;
-    ss >> diff;
-    BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
-    {
-      shared_ptr<const DiffLeaf> diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
-      if (diffLeaf == 0)
-        {
-          return;
-          /// \todo Log the error
-        }
-      NameInfoConstPtr info = diffLeaf->getInfo();
-      LeafContainer::iterator it = m_state.getLeaves().find (info);
-      SeqNo seq = diffLeaf->getSeq();
-
-      // switch (diffLeaf->getOperation())
-      // {
-      //   case UPDATE:
-      //     if (it == m_state.getLeaves().end())
-      //     {
-      //       string prefix = info.toString();
-      //       prefix += "/";
-      //       prefix += seq.getSession();
-      //       m_fetchCallback(prefix, 1, seq.getSeq());
-
-      //       NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
-      //       m_state.update(pInfo, seq);
-      //       diffLog->update(pInfo, seq);
-      //     }
-      //     else
-      //     {
-      //       SeqNo currSeq = (*it)->getSeq();
-      //       if (currSeq < seq)
-      //       {
-      //         string prefix = info.toString();
-      //         prefix += "/";
-      //         prefix += seq.getSession();
-
-      //         if (currSeq.getSession() == seq.getSession())
-      //           m_fetchCallback(prefix, currSeq.getSeq() + 1, seq.getSeq());
-      //         else
-      //           m_fetchCallback(prefix, 1, seq.getSeq());
-
-      //         NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
-      //         m_state.update(pInfo, seq);
-      //         diffLog->update(pInfo, seq);
-      //       }
-      //     }
-      //     break;
-
-      //   case REMOVE:
-      //     if (it != m_state.getLeaves().end())
-      //     {
-      //       NameInfoConstPtr pInfo = StdNameInfo::FindOrCreate(info.toString());
-      //       m_state.remove(pInfo);
-      //       diffLog->remove(pInfo);
-      //     }
-      //     break;
-
-      //   default:
-      //     break;
-      // }
-    }
-  }
-
-  diffLog->setDigest(m_state.getDigest());
-  m_log.insert (diffLog);
-
-  // notify upper layer
-  BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
-    {
-    }
+  DiffStatePtr diffLog = make_shared<DiffState> ();
   
-  sendSyncInterest();
+  try
+    {
+      recursive_mutex::scoped_lock lock (m_stateMutex);
+      
+      string last = name.substr(name.find_last_of("/") + 1);
+      istringstream ss (dataBuffer);
+
+      if (last == "state")
+        {
+          FullState full;
+          ss >> full;
+          BOOST_FOREACH (LeafConstPtr leaf, full.getLeaves()) // order doesn't matter
+            {
+              NameInfoConstPtr info = leaf->getInfo ();
+              SeqNo seq = leaf->getSeq ();
+
+              bool inserted = false;
+              bool updated = false;
+              SeqNo oldSeq;
+              tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+                  
+              if (updated)
+                {
+                  diffLog->update (info, seq);
+                  m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+                }
+            }
+        }
+      else
+        {
+          DiffState diff;
+          ss >> diff;
+          BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
+            {
+              DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+              BOOST_ASSERT (diffLeaf != 0);
+
+              NameInfoConstPtr info = diffLeaf->getInfo ();
+              if (diffLeaf->getOperation() == UPDATE)
+                {
+                  SeqNo seq = diffLeaf->getSeq ();
+                  
+                  bool inserted = false;
+                  bool updated = false;
+                  SeqNo oldSeq;
+                  tie (inserted, updated, oldSeq) = m_state.update (info, seq);
+                  
+                  if (updated)
+                    {
+                      diffLog->update (info, seq);
+                      m_onUpdate (info->toString (), seq.getSeq(), oldSeq);
+                    }
+                }
+              else if (diffLeaf->getOperation() == REMOVE)
+                {
+                  if (m_state.remove (info))
+                    {
+                      diffLog->remove (info);
+                      m_onRemove (info->toString ());
+                    }
+                }
+              else
+                {
+                  BOOST_ASSERT (false); // just in case
+                }
+            }
+        }
+
+      diffLog->setDigest(m_state.getDigest());
+      m_log.insert (diffLog);
+    }
+  catch (Error::SyncXmlDecodingFailure &e)
+    {
+      // log error
+      return;
+    }
+
+  if (diffLog->getLeaves ().size () > 0)
+    {
+      // notify upper layer
+      BOOST_FOREACH (LeafConstPtr leaf, diffLog->getLeaves ())
+        {
+          DiffLeafConstPtr diffLeaf = dynamic_pointer_cast<const DiffLeaf> (leaf);
+          BOOST_ASSERT (diffLeaf != 0);
+          
+            // m_fetchCallback (prefix, 1, seq.getSeq());
+        }
+
+      sendSyncInterest ();
+    }
 }
 
 void
 SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
 {
-  NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+  recursive_mutex::scoped_lock lock (m_stateMutex);
+
+  NameInfoConstPtr info = StdNameInfo::FindOrCreate (prefix);
   SeqNo seqN(session, seq);
   DiffStatePtr diff = make_shared<DiffState>();
   diff->update(info, seqN);
@@ -317,6 +294,8 @@
 void
 SyncLogic::sendSyncInterest ()
 {
+  recursive_mutex::scoped_lock lock (m_stateMutex);
+
   ostringstream os;
   os << m_syncPrefix << "/" << m_state.getDigest();