Bug solving in SyncLogic
diff --git a/log4cxx.properties b/log4cxx.properties
index e89f3b0..70f885a 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -8,11 +8,13 @@
 log4j.appender.A1.layout=org.apache.log4j.PatternLayout
 log4j.appender.A1.target=System.err
 #log4j.appender.A1.layout.ConversionPattern=%d{dd-MMM HH:MM:SS,SSS} %p %c %m%n
-log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-10c{1}  %m%n
+#log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c  %m%n
+log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS}  %-12c  %m%n
 
 log4j.logger.SyncLogic = TRACE
-log4j.logger.AppDataFetch = TRACE
-log4j.logger.Test = TRACE
+#log4j.logger.SyncInterestTable = TRACE
+# log4j.logger.AppDataFetch = TRACE
+# log4j.logger.Test = TRACE
 #log4j.logger.bgpparser=TRACE
 #log4j.logger.bgpparser.AttributeType=ERROR
 #log4j.logger.bgpparser.MRTCommonHeader=ERROR
diff --git a/model/sync-interest-table.cc b/model/sync-interest-table.cc
index b9f6484..3a08fbd 100644
--- a/model/sync-interest-table.cc
+++ b/model/sync-interest-table.cc
@@ -21,32 +21,29 @@
  */
 
 #include "sync-interest-table.h"
-
+#include "sync-log.h"
 using namespace std;
 using namespace boost;
 
+INIT_LOGGER ("SyncInterestTable");
+
 namespace Sync
 {
 
 SyncInterestTable::SyncInterestTable ()
-  : m_running (true)
 {
-  m_thread = thread (&SyncInterestTable::periodicCheck, this);
+  m_scheduler.schedule (posix_time::seconds (m_checkPeriod),
+                        bind (&SyncInterestTable::expireInterests, this),
+                        0);
 }
 
 SyncInterestTable::~SyncInterestTable ()
 {
-  // cout << "request interrupt: " << this_thread::get_id () << endl;
-  m_running = false;
-  m_thread.interrupt ();
-  m_thread.join ();
 }
 
 vector<string>
 SyncInterestTable::fetchAll ()
 {
-  expireInterests();
-
   recursive_mutex::scoped_lock lock (m_mutex);
   
   vector<string> entries;
@@ -62,7 +59,7 @@
 }
 
 bool
-SyncInterestTable::insert(string interest)
+SyncInterestTable::insert(const string &interest)
 {
   recursive_mutex::scoped_lock lock (m_mutex);
   TableContainer::iterator it = m_table.find (interest);
@@ -72,41 +69,52 @@
   m_table.insert (make_pair(interest, currentTime));
 }
 
-void SyncInterestTable::expireInterests()
+uint32_t
+SyncInterestTable::size () const
 {
   recursive_mutex::scoped_lock lock (m_mutex);
+  return m_table.size ();
+}
+
+bool
+SyncInterestTable::remove (const std::string &interest)
+{
+  recursive_mutex::scoped_lock lock (m_mutex);
+  TableContainer::iterator item = m_table.find (interest);
+  if (item != m_table.end ())
+    {
+      m_table.erase (item);
+      return true;
+    }
+  return false;
+}
+
+
+void SyncInterestTable::expireInterests ()
+{ 
+  recursive_mutex::scoped_lock lock (m_mutex);
+
+  uint32_t count = 0;
   time_t currentTime = time(0);
   TableContainer::iterator it = m_table.begin (); 
   while (it != m_table.end())
     {
     time_t timestamp = it->second;
-    if (currentTime - timestamp > m_checkPeriod) {
-      it = m_table.erase(it);
-    }
+    if (currentTime - timestamp > m_checkPeriod)
+      {
+        it = m_table.erase(it);
+        count ++;
+      }
     else
       ++it;
   }
+
+  _LOG_DEBUG ("expireInterests (): expired " << count);
+  
+  m_scheduler.schedule (posix_time::seconds (m_checkPeriod),
+                        bind (&SyncInterestTable::expireInterests, this),
+                        0);
 }
 
-void SyncInterestTable::periodicCheck ()
-{
-  while (m_running)
-    {
-      try
-        {
-          // cout << "enterSleep: " << this_thread::get_id () << endl;
-      
-          this_thread::sleep (posix_time::seconds(4));
-          expireInterests ();
-        }
-      catch (boost::thread_interrupted e)
-        {
-          // should I just assign m_running = false here?
-          
-          // cout << "interrupted: " << this_thread::get_id () << endl;
-          // do nothing
-        }
-    }
-}
 
 }
diff --git a/model/sync-interest-table.h b/model/sync-interest-table.h
index bc5f2c2..199d731 100644
--- a/model/sync-interest-table.h
+++ b/model/sync-interest-table.h
@@ -29,6 +29,7 @@
 #include <boost/thread/recursive_mutex.hpp>
 #include <boost/thread/thread.hpp>
 #include <ctime>
+#include "sync-scheduler.h"
 
 namespace Sync {
 
@@ -48,7 +49,14 @@
    * @brief Insert an interest, if interest already exists, update the
    * timestamp
    */
-  bool insert (std::string interest);
+  bool
+  insert (const std::string &interest);
+
+  /**
+   * @brief Remove interest (e.g., when it was satisfied)
+   */
+  bool
+  remove (const std::string &interest);
 
   /**
    * @brief fetch all Interests and clear the table
@@ -56,24 +64,24 @@
   std::vector<std::string>
   fetchAll ();
 
+  uint32_t
+  size () const;
+
 private:
   /**
    * @brief periodically called to expire Interest
    */
-  void expireInterests ();
-
-  void periodicCheck ();
+  void
+  expireInterests ();
 
 private:
   typedef boost::unordered_map<std::string, time_t> TableContainer;
   
   static const int m_checkPeriod = 4;
   TableContainer m_table; // pit entries
-  
-  boost::thread m_thread; // thread to check every 4 sec
-  volatile bool m_running;
-  boost::recursive_mutex m_mutex;
 
+  Scheduler m_scheduler;
+  mutable boost::recursive_mutex m_mutex;
 };
 
 } // Sync
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 2134256..7f052f1 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -34,7 +34,7 @@
 using namespace std;
 using namespace boost;
 
-INIT_LOGGER ("SyncLogic");
+// INIT_LOGGER ("SyncLogic");
 
 namespace Sync
 {
@@ -47,9 +47,16 @@
   , m_onRemove (onRemove)
   , m_ccnxHandle(new CcnxWrapper())
   , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
-  , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (10,50))
+  , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,80))
 {
-  _LOG_FUNCTION (syncPrefix);
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+  // _LOG_FUNCTION (syncPrefix);
+  static int id = 0;
+  staticModuleLogger = log4cxx::Logger::getLogger ("SyncLogic." + lexical_cast<string> (id));
+  id ++;
+#endif
+#endif
   
   m_ccnxHandle->setInterestFilter (m_syncPrefix,
                                    bind (&SyncLogic::respondSyncInterest, this, _1));
@@ -61,7 +68,7 @@
 
 SyncLogic::~SyncLogic ()
 {
-  _LOG_FUNCTION (this);
+  // _LOG_FUNCTION (this);
   // cout << "SyncLogic::~SyncLogic ()" << endl;
 
   m_ccnxHandle.reset ();
@@ -100,9 +107,16 @@
     {
       _LOG_TRACE (">> D " << interestName << "/state" << " (zero)");
 
+      m_syncInterestTable.remove (interestName + "/state");
       m_ccnxHandle->publishData (interestName + "/state",
                                  lexical_cast<string> (m_state),
                                  m_syncResponseFreshness);
+      if (m_outstandingInterest == interestName)
+        {
+          m_scheduler.schedule (posix_time::seconds (0),
+                                bind (&SyncLogic::sendSyncInterest, this),
+                                REEXPRESSING_INTEREST);
+        }
       return;
     }
 
@@ -111,12 +125,12 @@
       // cout << interestName << "\n";
       if (digest->isZero ())
         {
-          _LOG_TRACE ("Digest is zero, adding /state to PIT");
+          _LOG_TRACE ("processSyncInterest (): Digest is zero, adding /state to PIT");
           m_syncInterestTable.insert (interestName + "/state");
         }
       else
         {
-          _LOG_TRACE ("Same state. Adding to PIT");
+          _LOG_TRACE ("processSyncInterest (): Same state. Adding to PIT");
           m_syncInterestTable.insert (interestName);
         }
       return;
@@ -127,16 +141,22 @@
   if (stateInDiffLog != m_log.end ())
   {
     _LOG_TRACE (">> D " << interestName);
-    
+
+    m_syncInterestTable.remove (interestName);
     m_ccnxHandle->publishData (interestName,
                                lexical_cast<string> (*(*stateInDiffLog)->diff ()),
                                m_syncResponseFreshness);
+    if (m_outstandingInterest == interestName)
+      {
+        m_scheduler.schedule (posix_time::seconds (0),
+                              bind (&SyncLogic::sendSyncInterest, this),
+                              REEXPRESSING_INTEREST);
+      }
     return;
   }
 
   if (!timedProcessing)
     {
-      _LOG_DEBUG ("hmm");
       m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
                             bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
                             DELAYED_INTEREST_PROCESSING);
@@ -145,10 +165,18 @@
   else
     {
       _LOG_TRACE (">> D " << interestName << "/state" << " (timed processing)");
-      
+
+      m_syncInterestTable.remove (interestName + "/state");
       m_ccnxHandle->publishData (interestName + "/state",
                                  lexical_cast<string> (m_state),
                                  m_syncResponseFreshness);
+
+      if (m_outstandingInterest == interestName)
+        {
+          m_scheduler.schedule (posix_time::seconds (0),
+                                bind (&SyncLogic::sendSyncInterest, this),
+                                REEXPRESSING_INTEREST);
+        }
     }
 }
 
@@ -244,14 +272,17 @@
   // if state has changed, then it is safe to express a new interest
   if (diffLog->getLeaves ().size () > 0)
     {
-      sendSyncInterest ();
+      m_scheduler.schedule (posix_time::seconds (0),
+                            bind (&SyncLogic::sendSyncInterest, this),
+                            REEXPRESSING_INTEREST);
     }
   else
     {
       // should not reexpress the same interest. Need at least wait for data lifetime
       // Otherwise we will get immediate reply from the local daemon and there will be 100% utilization
       m_scheduler.cancel (REEXPRESSING_INTEREST);
-      m_scheduler.schedule (posix_time::seconds (m_syncResponseFreshness),
+      // m_scheduler.schedule (posix_time::seconds (0),
+      m_scheduler.schedule (posix_time::seconds (m_syncResponseFreshness) + posix_time::milliseconds (1),
                             bind (&SyncLogic::sendSyncInterest, this),
                             REEXPRESSING_INTEREST);
     }
@@ -265,10 +296,27 @@
     {
       stringstream ss;
       ss << *diffLog;
+      bool satisfiedOwnInterest = false;
+      
       for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
         {
           _LOG_TRACE (">> D " << *ii);
           m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+
+          {
+            recursive_mutex::scoped_lock lock (m_stateMutex);
+            // _LOG_DEBUG (*ii << " == " << m_outstandingInterest << " = " << (*ii == m_outstandingInterest));
+            satisfiedOwnInterest = satisfiedOwnInterest || (*ii == m_outstandingInterest) || (*ii == (m_outstandingInterest + "/state"));
+          }
+        }
+
+      if (satisfiedOwnInterest)
+        {
+          _LOG_DEBUG ("Have satisfied our own interest. Scheduling interest reexpression");
+          // we need to reexpress interest only if we satisfied our own interest
+          m_scheduler.schedule (posix_time::milliseconds (0),
+                                bind (&SyncLogic::sendSyncInterest, this),
+                                REEXPRESSING_INTEREST);
         }
     }
 }
@@ -285,7 +333,7 @@
   m_log.erase (m_state.getDigest()); // remove diff state with the same digest.  next pointers are still valid
   /// @todo Optimization
   m_log.get<sequenced> ().push_front (diffLog);
-  _LOG_DEBUG (*diffLog->getDigest () << " " << m_log.size ());
+  // _LOG_DEBUG (*diffLog->getDigest () << " " << m_log.size ());
 }
 
 void
@@ -296,16 +344,19 @@
     //cout << "Add local names" <<endl;
     recursive_mutex::scoped_lock lock (m_stateMutex);
     NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
-  
+
     SeqNo seqN (session, seq);
     m_state.update(info, seqN);
 
+    _LOG_DEBUG ("addLocalNames (): new state " << *m_state.getDigest ());
+    
     diff = make_shared<DiffState>();
     diff->update(info, seqN);
     processPendingSyncInterests (diff);
   }
 
-  satisfyPendingSyncInterests (diff);
+  // _LOG_DEBUG ("PIT size: " << m_syncInterestTable.size ());
+  satisfyPendingSyncInterests (diff);  
 }
 
 void
@@ -323,20 +374,25 @@
     processPendingSyncInterests (diff);
   }
 
-  satisfyPendingSyncInterests (diff);
+  satisfyPendingSyncInterests (diff);  
 }
 
 void
 SyncLogic::sendSyncInterest ()
 {
-  // cout << "Sending Sync Interest" << endl;
-  recursive_mutex::scoped_lock lock (m_stateMutex);
-
   ostringstream os;
-  os << m_syncPrefix << "/" << *m_state.getDigest();
 
-  _LOG_TRACE (">> I " << os.str ());
+  {
+    // cout << "Sending Sync Interest" << endl;
+    recursive_mutex::scoped_lock lock (m_stateMutex);
 
+    os << m_syncPrefix << "/" << *m_state.getDigest();
+
+    _LOG_TRACE (">> I " << os.str ());
+
+    m_outstandingInterest = os.str ();
+  }
+  
   m_ccnxHandle->sendInterest (os.str (),
                               bind (&SyncLogic::processSyncData, this, _1, _2));
 
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 871ee33..e966dea 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -37,6 +37,12 @@
 
 #include "sync-diff-state-container.h"
 
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+#include <log4cxx/logger.h>
+#endif
+#endif
+
 namespace Sync {
 
 /**
@@ -114,6 +120,7 @@
   DiffStateContainer m_log;
   boost::recursive_mutex m_stateMutex;
 
+  std::string m_outstandingInterest;
   SyncInterestTable m_syncInterestTable;
 
   std::string m_syncPrefix;
@@ -133,6 +140,12 @@
       DELAYED_INTEREST_PROCESSING = 1,
       REEXPRESSING_INTEREST = 2
     };
+
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+  log4cxx::LoggerPtr staticModuleLogger;
+#endif
+#endif  
 };
 
 
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index f54f91f..73a904a 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -66,13 +66,13 @@
 	    nextTime = m_events.begin ()->time;
           }
 
-	  if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
+	  if (nextTime > get_system_time ())
 	    {
 	      this_thread::sleep (nextTime - get_system_time ());
 
 	      // sleeping
 
-	      if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
+	      if (nextTime > get_system_time ())
                 {
                   // cout << "expected here" << endl;
                   continue; // something changes, try again
diff --git a/test/test_app_socket.cc b/test/test_app_socket.cc
index bf7db83..d1b90c0 100644
--- a/test/test_app_socket.cc
+++ b/test/test_app_socket.cc
@@ -88,7 +88,7 @@
   string data0 = "Very funny Scotty, now beam down my clothes";
   _LOG_DEBUG ("s1 publish");
   s1.publish (p1, 0, data0, 10); 
-  this_thread::sleep (posix_time::milliseconds (50));
+  this_thread::sleep (posix_time::milliseconds (250));
 
   // from code logic, we won't be fetching our own data
   a1.set(p1 + "/0/0", data0);
@@ -103,9 +103,8 @@
   s1.publish (p1, 0, data1, 10);
   _LOG_DEBUG ("s1 publish");
   s1.publish (p1, 0, data2, 10);
-  this_thread::sleep (posix_time::milliseconds (100));
+  this_thread::sleep (posix_time::milliseconds (250));
   
-  _LOG_DEBUG ("testing");
   // // // from code logic, we won't be fetching our own data
   a1.set(p1 + "/0/1", data1);
   a1.set(p1 + "/0/2", data2);
@@ -113,36 +112,35 @@
   BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
 
   // // another single source
-  // // string data3 = "You surf the Internet, I surf the real world";
-  // string data4 = "I got a fortune cookie once that said 'You like Chinese food'";
-  // string data5 = "Real men wear pink. Why? Because their wives make them";
-  // // s3.publish(p3, 0, data3, 10); 
-  // // this_thread::sleep (posix_time::milliseconds (1000));
+  string data3 = "You surf the Internet, I surf the real world";
+  string data4 = "I got a fortune cookie once that said 'You like Chinese food'";
+  string data5 = "Real men wear pink. Why? Because their wives make them";
+  _LOG_DEBUG ("s3 publish");
+  s3.publish(p3, 0, data3, 10); 
+  this_thread::sleep (posix_time::milliseconds (200));
   
-  // // another single source, multiple data at once
-  // s2.publish(p2, 0, data4, 10); 
-  // s2.publish(p2, 0, data5, 10);
-  // this_thread::sleep (posix_time::milliseconds (1000));
+  // another single source, multiple data at once
+  s2.publish(p2, 0, data4, 10); 
+  s2.publish(p2, 0, data5, 10);
+  this_thread::sleep (posix_time::milliseconds (200));
 
-  // // from code logic, we won't be fetching our own data
-  // // a3.set(p3 + "/0/0", data3);
-  // a2.set(p2 + "/0/0", data4);
-  // a2.set(p2 + "/0/1", data5);
-  // BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
-  // // BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+  // from code logic, we won't be fetching our own data
+  a3.set(p3 + "/0/0", data3);
+  a2.set(p2 + "/0/0", data4);
+  a2.set(p2 + "/0/1", data5);
+  BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+  BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
 
-  // // not sure weither this is simultanous data generation from multiple sources
-  // string data6 = "Shakespeare says: 'Prose before hos.'";
-  // string data7 = "Pick good people, talent never wears out";
-  // s1.publish(p1, 0, data6, 10); 
-  // s2.publish(p2, 0, data7, 10); 
-  // usleep(10000);
+  // not sure weither this is simultanous data generation from multiple sources
+  string data6 = "Shakespeare says: 'Prose before hos.'";
+  string data7 = "Pick good people, talent never wears out";
+  s1.publish(p1, 0, data6, 10); 
+  s2.publish(p2, 0, data7, 10); 
+  this_thread::sleep (posix_time::milliseconds (10000));
 
-  // // from code logic, we won't be fetching our own data
-  // a1.set(p1 + "/0/3", data6);
-  // a2.set(p2 + "/0/2", data7);
-  // BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
-  // BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+  // from code logic, we won't be fetching our own data
+  a1.set(p1 + "/0/3", data6);
+  a2.set(p2 + "/0/2", data7);
+  BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+  BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
 }
-
-