Solving some deadlocks and crashes

It is still some mystery with two clients and delays
diff --git a/model/sync-app-data-publish.cc b/model/sync-app-data-publish.cc
index faaf482..adef5e3 100644
--- a/model/sync-app-data-publish.cc
+++ b/model/sync-app-data-publish.cc
@@ -44,29 +44,22 @@
 uint32_t
 AppDataPublish::getNextSeq (const string &prefix, uint32_t session)
 {
-  unordered_map<string, Seq>::iterator i = m_sequenceLog.find(prefix);
+  SequenceLog::iterator i = m_sequenceLog.find (prefix);
 
-  if (i != m_sequenceLog.end())
+  if (i != m_sequenceLog.end ())
     {
       Seq s = i->second;
       if (s.session == session)
         return s.seq;
     }
-	else
-    BOOST_THROW_EXCEPTION(GetSeqException() << errmsg_info_str("No corresponding seq"));
+  else
+    return 0;
 }
 
-bool
+uint32_t
 AppDataPublish::publishData (const string &name, uint32_t session, const string &dataBuffer, int freshness)
 {
-  uint32_t seq = 0;
-	try
-		{
-			seq =  getNextSeq(name, session);
-		}
-	catch (GetSeqException &e){
-    m_sequenceLog.erase(name);
-	}
+  uint32_t seq = getNextSeq (name, session);
 
   ostringstream contentNameWithSeqno;
   contentNameWithSeqno << name << "/" << session << "/" << seq;
@@ -78,12 +71,12 @@
   s.seq = seq + 1;
   m_sequenceLog[name] = s;
 
-  unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
-  if (it != m_recentData.end()) 
-    m_recentData.erase(it);
-  m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
+  // unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
+  // if (it != m_recentData.end()) 
+  //   m_recentData.erase(it);
+  // m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
 
-  return true;
+  return seq;
 }
 
-}
+} // Sync
diff --git a/model/sync-app-data-publish.h b/model/sync-app-data-publish.h
index 85bb9ae..aa05d6c 100644
--- a/model/sync-app-data-publish.h
+++ b/model/sync-app-data-publish.h
@@ -23,10 +23,11 @@
 #ifndef SYNC_APP_DATA_PUBLISH_H
 #define SYNC_APP_DATA_PUBLISH_H
 
-#include <boost/unordered_map.hpp>
+// #include <boost/unordered_map.hpp>
 #include "sync-seq-no.h"
 #include "sync-ccnx-wrapper.h"
 #include <utility>
+#include <map>
 
 namespace Sync {
 
@@ -78,14 +79,17 @@
    * @param session session to which data is published
    * @param dataBuffer the data itself
    * @param freshness the freshness for the data object
-   * @return whether the publish succeeded
+   * @return published sequence number, will throw an exception if something wrong happened
    */
-  bool publishData (const std::string &name, uint32_t session, const std::string &dataBuffer, int freshness);
+  uint32_t publishData (const std::string &name, uint32_t session, const std::string &dataBuffer, int freshness);
 
 private:
-  boost::unordered_map<std::string, Seq> m_sequenceLog;
+  typedef std::map<std::string, Seq> SequenceLog;
+  typedef std::map<std::pair<std::string, uint32_t>, std::string> RecentData;
+  
   CcnxWrapperPtr m_ccnxHandle;
-  boost::unordered_map<std::pair<std::string, uint32_t>, std::string> m_recentData;
+  SequenceLog m_sequenceLog;
+  RecentData m_recentData;
 };
 
 } // Sync
diff --git a/model/sync-app-socket.cc b/model/sync-app-socket.cc
index 2f1d174..42d34e4 100644
--- a/model/sync-app-socket.cc
+++ b/model/sync-app-socket.cc
@@ -44,8 +44,8 @@
 
 bool SyncAppSocket::publish (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
 {
-  m_publisher.publishData (prefix, session, dataBuffer, freshness);
-  m_syncLogic.addLocalNames (prefix, session, m_publisher.getNextSeq (prefix, session));
+  uint32_t sequence = m_publisher.publishData (prefix, session, dataBuffer, freshness);
+  m_syncLogic.addLocalNames (prefix, session, sequence);
 }
 
 }
diff --git a/model/sync-app-socket.h b/model/sync-app-socket.h
index e6f1df1..4f7ec33 100644
--- a/model/sync-app-socket.h
+++ b/model/sync-app-socket.h
@@ -71,7 +71,7 @@
   void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
 
 private:
-	CcnxWrapperPtr m_appHandle;
+  CcnxWrapperPtr m_appHandle;
 
   AppDataFetch   m_fetcher;
   AppDataPublish m_publisher;
diff --git a/model/sync-ccnx-wrapper.cc b/model/sync-ccnx-wrapper.cc
index e911e2e..50da304 100644
--- a/model/sync-ccnx-wrapper.cc
+++ b/model/sync-ccnx-wrapper.cc
@@ -37,7 +37,7 @@
   , m_keyLoactor (0)
   , m_running (true)
 {
-  m_handle = ccn_create();
+  m_handle = ccn_create ();
   if (ccn_connect(m_handle, NULL) < 0)
     BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
   initKeyStore();
@@ -47,8 +47,13 @@
 
 CcnxWrapper::~CcnxWrapper()
 {
-  m_running = false;
-  m_thread.join();
+  // std::cout << "CcnxWrapper::~CcnxWrapper()" << std::endl;
+  {
+    recursive_mutex::scoped_lock lock(m_mutex);
+    m_running = false;
+  }
+  
+  m_thread.join ();
   ccn_disconnect (m_handle);
   ccn_destroy (&m_handle);
   ccn_charbuf_destroy (&m_keyLoactor);
@@ -112,11 +117,15 @@
       if (res >= 0)
         {
           int ret = poll(pfds, 1, 100);
-          if (ret >= 0)
+          if (ret < 0)
             {
-              recursive_mutex::scoped_lock lock(m_mutex);
-              res = ccn_run(m_handle, 0);
+              BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
             }
+
+          recursive_mutex::scoped_lock lock(m_mutex);
+          if (!m_running) break;
+          
+          res = ccn_run(m_handle, 0);
         }
     }
 }
@@ -165,6 +174,7 @@
   switch (kind)
     {
     case CCN_UPCALL_FINAL: // effective in unit tests
+      cout << "FINAL??" << endl;
       delete f;
       delete selfp;
       return CCN_UPCALL_RESULT_OK;
@@ -233,6 +243,7 @@
 
 int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback)
 {
+  // std::cout << "Send interests for " << strInterest << std::endl;
   ccn_charbuf *pname = ccn_charbuf_create();
   ccn_closure *dataClosure = new ccn_closure;
 
@@ -264,4 +275,20 @@
   ccn_charbuf_destroy(&pname);
 }
 
+void
+CcnxWrapper::clearInterestFilter (const std::string &prefix)
+{
+  std::cout << "clearInterestFilter" << std::endl;
+  ccn_charbuf *pname = ccn_charbuf_create();
+
+  ccn_name_from_uri (pname, prefix.c_str());
+  int ret = ccn_set_interest_filter (m_handle, pname, 0);
+  if (ret < 0)
+    {
+      BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
+    }
+
+  ccn_charbuf_destroy(&pname);
+}
+
 }
diff --git a/model/sync-ccnx-wrapper.h b/model/sync-ccnx-wrapper.h
index fc7afb1..ecfeee3 100644
--- a/model/sync-ccnx-wrapper.h
+++ b/model/sync-ccnx-wrapper.h
@@ -75,7 +75,7 @@
   sendInterest (const std::string &strInterest, const DataCallback &dataCallback);
 
   /**
-   * @brief set Interest filter (specify what interest you want to receive
+   * @brief set Interest filter (specify what interest you want to receive)
    *
    * @param prefix the prefix of Interest
    * @param interestCallback the callback function to deal with the returned data
@@ -85,6 +85,13 @@
   setInterestFilter (const std::string &prefix, const InterestCallback &interestCallback);
 
   /**
+   * @brief clear Interest filter
+   * @param prefix the prefix of Interest
+   */
+  void
+  clearInterestFilter (const std::string &prefix);
+
+  /**
    * @brief publish data and put it to local ccn content store; need to grab
    * lock m_mutex first
    *
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index a954db8..c81f2f1 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -45,24 +45,26 @@
   , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
   , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,100))
 {
-  m_ccnxHandle->setInterestFilter (syncPrefix,
+  m_ccnxHandle->setInterestFilter (m_syncPrefix,
                                    bind (&SyncLogic::respondSyncInterest, this, _1));
 
-  m_scheduler.schedule (posix_time::seconds (4),
-                        bind (&SyncLogic::sendSyncInterest, this),
-                        REEXPRESSING_INTEREST);
+  sendSyncInterest ();
 }
 
 SyncLogic::~SyncLogic ()
 {
-}
+  // cout << "SyncLogic::~SyncLogic ()" << endl;
 
+  m_ccnxHandle.reset ();
+}
 
 void
 SyncLogic::respondSyncInterest (const string &interest)
 {
-	//cout << "Respond Sync Interest" << endl;
+  //cout << "Respond Sync Interest" << endl;
   string hash = interest.substr(interest.find_last_of("/") + 1);
+  // cout << "Received Sync Interest: " << hash << endl;
+  
   DigestPtr digest = make_shared<Digest> ();
   try
     {
@@ -81,7 +83,7 @@
 void
 SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
 {
-  //cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+  // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
   recursive_mutex::scoped_lock lock (m_stateMutex);
 
   if (*m_state.getDigest() == *digest)
@@ -129,7 +131,7 @@
 void
 SyncLogic::processSyncData (const string &name, const string &dataBuffer)
 {
-  //cout << "Process Sync Data" <<endl;
+  // cout << "Process Sync Data" << endl;
   DiffStatePtr diffLog = make_shared<DiffState> ();
   
   try
@@ -232,20 +234,8 @@
 }
 
 void
-SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog) 
+SyncLogic::satisfyPendingSyncInterests (DiffStatePtr diffLog)
 {
-  //cout << "Process Pending Interests" <<endl;
-  recursive_mutex::scoped_lock lock (m_stateMutex);
-
-  diffLog->setDigest (m_state.getDigest());  
-  if (m_log.size () > 0)
-    {
-      m_log.get<sequenced> ().front ()->setNext (diffLog);
-    }
-  m_log.erase (m_state.getDigest()); // remove diff state with the same digest.  next pointers are still valid
-  /// @todo Optimization
-  m_log.insert (diffLog);
-
   vector<string> pis = m_syncInterestTable.fetchAll ();
   if (pis.size () > 0)
     {
@@ -259,31 +249,55 @@
 }
 
 void
+SyncLogic::processPendingSyncInterests (DiffStatePtr diffLog) 
+{
+  //cout << "Process Pending Interests" <<endl;
+  diffLog->setDigest (m_state.getDigest());  
+  if (m_log.size () > 0)
+    {
+      m_log.get<sequenced> ().front ()->setNext (diffLog);
+    }
+  m_log.erase (m_state.getDigest()); // remove diff state with the same digest.  next pointers are still valid
+  /// @todo Optimization
+  m_log.insert (diffLog);
+}
+
+void
 SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
 {
-  //cout << "Add local names" <<endl;
-  recursive_mutex::scoped_lock lock (m_stateMutex);
-  NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+  DiffStatePtr diff;
+  {
+    //cout << "Add local names" <<endl;
+    recursive_mutex::scoped_lock lock (m_stateMutex);
+    NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
   
-  SeqNo seqN (session, seq);
-  m_state.update(info, seqN);
+    SeqNo seqN (session, seq);
+    m_state.update(info, seqN);
 
-  DiffStatePtr diff = make_shared<DiffState>();
-  diff->update(info, seqN);
-  processPendingSyncInterests (diff);
+    diff = make_shared<DiffState>();
+    diff->update(info, seqN);
+    processPendingSyncInterests (diff);
+  }
+
+  satisfyPendingSyncInterests (diff);
 }
 
 void
 SyncLogic::remove(const string &prefix) 
 {
-  recursive_mutex::scoped_lock lock (m_stateMutex);
-  NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
-  m_state.remove(info);	
+  DiffStatePtr diff;
+  {
+    recursive_mutex::scoped_lock lock (m_stateMutex);
+    NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+    m_state.remove(info);	
 
-  DiffStatePtr diff = make_shared<DiffState>();
-  diff->remove(info);
+    diff = make_shared<DiffState>();
+    diff->remove(info);
 
-  processPendingSyncInterests (diff);
+    processPendingSyncInterests (diff);
+  }
+
+  satisfyPendingSyncInterests (diff);
 }
 
 void
diff --git a/model/sync-logic.h b/model/sync-logic.h
index b5a4174..871ee33 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -104,7 +104,10 @@
   sendSyncInterest ();
 
   void 
-  processPendingSyncInterests(DiffStatePtr &diff);
+  processPendingSyncInterests (DiffStatePtr diff);
+
+  void
+  satisfyPendingSyncInterests (DiffStatePtr diff);
 
 private:
   FullState m_state;