Correcting SyncLogic with respect of delayed response to unrecognized digests
diff --git a/model/sync-ccnx-wrapper.cc b/model/sync-ccnx-wrapper.cc
index 6b25dfa..1b0e5b5 100644
--- a/model/sync-ccnx-wrapper.cc
+++ b/model/sync-ccnx-wrapper.cc
@@ -24,6 +24,7 @@
 #include <poll.h>
 #include <boost/throw_exception.hpp>
 typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
+typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
 
 using namespace std;
 using namespace boost;
@@ -253,8 +254,11 @@
   ccn_name_from_uri (pname, prefix.c_str());
   interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
   interestClosure->p = &incomingInterest;
-  if(ccn_set_interest_filter (m_handle, pname, interestClosure) < 0)
-    BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed"));
+  int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
+  if (ret < 0)
+    {
+      BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
+    }
 
   ccn_charbuf_destroy(&pname);
 }
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index ea341e1..a3fe621 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -25,6 +25,8 @@
 #include "sync-full-leaf.h"
 #include <boost/make_shared.hpp>
 #include <boost/foreach.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
 #include <vector>
 
 using namespace std;
@@ -33,20 +35,138 @@
 namespace Sync
 {
 
+const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
+
+
 SyncLogic::SyncLogic (const string &syncPrefix,
                       LogicCallback fetch,
                       CcnxWrapperPtr ccnxHandle)
   : m_syncPrefix (syncPrefix)
   , m_fetch (fetch)
   , m_ccnxHandle (ccnxHandle)
+  , m_delayedCheckThreadRunning (true)
 {
   srandom(time(NULL));
-  m_ccnxHandle->setInterestFilter(syncPrefix, bind(&SyncLogic::respondSyncInterest, this, _1));
+  m_ccnxHandle->setInterestFilter (syncPrefix,
+                                   bind (&SyncLogic::respondSyncInterest, this, _1));
+
+  m_delayedCheckThread = thread (&SyncLogic::delayedChecksLoop, this);
 }
 
 SyncLogic::~SyncLogic ()
 {
+  m_delayedCheckThreadRunning = false;
+  // cout << "Requested stop" << this_thread::get_id () << endl;
+  m_delayedCheckThread.interrupt ();
+  m_delayedCheckThread.join ();
+}
 
+void
+SyncLogic::delayedChecksLoop ()
+{
+  while (m_delayedCheckThreadRunning)
+    {
+      try
+        {
+          DelayedChecksList::value_type tuple;
+          
+          {
+            unique_lock<mutex> lock (m_listChecksMutex);
+            while (m_delayedCheckThreadRunning && m_listChecks.size () == 0)
+              {
+                m_listChecksCondition.wait (lock);
+                // cout << "Got something" << endl;
+              }
+
+            if (m_listChecks.size () == 0) continue;
+
+            tuple = m_listChecks.front ();
+            m_listChecks.pop_front ();
+            // cout << "pop" << endl;
+            // release the mutex
+          }
+
+          // waiting and calling
+          
+          // cout << "Duration: " << tuple.get<0> () - get_system_time () << endl;
+          this_thread::sleep (tuple.get<0> ()); 
+          
+          if (!m_delayedCheckThreadRunning) continue;
+          tuple.get<1> () (); // call the scheduled function
+        }
+      catch (thread_interrupted e)
+        {
+          // cout << "interrupted: " << this_thread::get_id () << endl;
+          // do nothing
+        }
+    }
+  // cout << "Exited...\n";
+}
+
+
+
+void
+SyncLogic::respondSyncInterest (const string &interest)
+{
+  string hash = interest.substr(interest.find_last_of("/") + 1);
+  DigestPtr digest = make_shared<Digest> ();
+  try
+    {
+      istringstream is (hash);
+      is >> *digest;
+    }
+  catch (Error::DigestCalculationError &e)
+    {
+      // log error. ignoring it for now, later we should log it
+      return;
+    }
+
+  processSyncInterest (digest, interest);
+}
+
+void
+SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
+{
+  // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+    
+  if (*m_state.getDigest() == *digest)
+  {
+    m_syncInterestTable.insert (interestName);
+    return;
+  }
+
+  DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
+
+  if (stateInDiffLog != m_log.end ())
+  {
+    m_ccnxHandle->publishData (interestName,
+                               lexical_cast<string> (*(*stateInDiffLog)->diff ()),
+                               m_syncResponseFreshness);
+    return;
+  }
+
+  if (!timedProcessing)
+    {
+      {
+        // Alex: Should we ignore interests if interest with the same digest is already in the wait queue?
+        
+        lock_guard<mutex> lock (m_listChecksMutex);
+        system_time delay = get_system_time () + m_delayedCheckTime;
+        // do we need randomization??
+        // delay += boost::ptime::milliseconds (rand() % 80 + 20);
+      
+        m_listChecks.push_back (make_tuple (delay,
+                                            bind (&SyncLogic::processSyncInterest, this, digest, interestName, true))
+                                );
+      }
+      m_listChecksCondition.notify_one ();
+    }
+  else
+    {
+      m_ccnxHandle->publishData (interestName + "/state",
+                                 lexical_cast<string> (m_state),
+                                 m_syncResponseFreshness);
+    }
 }
 
 void
@@ -176,7 +296,7 @@
   diff->setDigest(m_state.getDigest());
   m_log.insert(diff);
 
-  vector<string> pis = m_syncInterestTable.fetchAll();
+  vector<string> pis = m_syncInterestTable.fetchAll ();
   stringstream ss;
   ss << *diff;
   for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
@@ -186,62 +306,6 @@
 }
 
 void
-SyncLogic::checkAgain (const string &interest, DigestPtr digest)
-{
-  int wait = rand() % 80 + 20;
-  sleep(wait/1000.0);
-
-  if (*m_state.getDigest() == *digest)
-  {
-    m_syncInterestTable.insert(interest);
-    return;
-  }
-
-  DiffStateContainer::iterator ii = m_log.find (digest);
-  if (ii != m_log.end ())
-  {
-    stringstream ss;
-    ss << *(*ii)->diff();
-    m_ccnxHandle->publishData(interest, ss.str(), m_syncResponseFreshness);
-  }
-  else
-  {
-    stringstream ss;
-    ss << m_state;
-    m_ccnxHandle->publishData(interest + "/state", ss.str(), m_syncResponseFreshness);
-  }
-}
-
-void
-SyncLogic::respondSyncInterest (const string &interest)
-{
-  string hash = interest.substr(interest.find_last_of("/") + 1);
-  DigestPtr digest = make_shared<Digest> ();
-  *digest << hash;
-  digest->finalize ();
-
-  if (*m_state.getDigest() == *digest)
-  {
-    m_syncInterestTable.insert (interest);
-    return;
-  }
-
-  DiffStateContainer::iterator ii = m_log.find (digest);
-
-  if (ii != m_log.end())
-  {
-    stringstream ss;
-    ss << *(*ii)->diff();
-    m_ccnxHandle->publishData(interest, ss.str(), m_syncResponseFreshness);
-  }
-  else
-  {
-    m_thread.join();
-    m_thread = thread(&SyncLogic::checkAgain, this, interest, digest);
-  }
-}
-
-void
 SyncLogic::sendSyncInterest ()
 {
   ostringstream os;
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 03d80b9..b624ecb 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -24,6 +24,8 @@
 #define SYNC_LOGIC_H
 #include <boost/shared_ptr.hpp>
 #include <boost/function.hpp>
+#include "boost/date_time/posix_time/posix_time_types.hpp"
+
 #include "sync-ccnx-wrapper.h"
 #include "sync-interest-table.h"
 #include "sync-diff-state.h"
@@ -73,11 +75,27 @@
    */
   void processSyncData (const std::string &name, const std::string &dataBuffer);
 
+#ifdef _DEBUG
+  size_t
+  getListChecksSize ()
+  {
+    boost::lock_guard<boost::mutex> lock (m_listChecksMutex);
+    return m_listChecks.size ();
+  }
+#endif
+  
 private:
+  void delayedChecksLoop ();
+
+  void
+  processSyncInterest (DigestConstPtr digest, const std::string &interestname, bool timedProcessing=false);
+  
   void sendSyncInterest ();
-  void checkAgain (const std::string &interest, DigestPtr digest);
+  // void checkAgain (const std::string &interest, DigestPtr digest);
 
 private:
+  typedef std::list< boost::tuple< boost::system_time, boost::function< void ( ) > > > DelayedChecksList;
+
   FullState m_state;
   DiffStateContainer m_log;
   SyncInterestTable m_syncInterestTable;
@@ -86,8 +104,13 @@
   LogicCallback m_fetch;
   CcnxWrapperPtr m_ccnxHandle;
 
-  boost::thread m_thread;
+  boost::thread m_delayedCheckThread;
+  bool          m_delayedCheckThreadRunning;
+  DelayedChecksList m_listChecks;
+  boost::condition_variable m_listChecksCondition;
+  boost::mutex  m_listChecksMutex;
 
+  static const boost::posix_time::time_duration m_delayedCheckTime;
   static const int m_syncResponseFreshness = 2;
 };