Many improvements for the event scheduler

Another big change: maintaining an outstanding interests and
re-expressing this interest every 4 seconds (or 4 seconds after interest
is satisfied and expressed again)

Initial testing for SyncLogic
diff --git a/model/sync-full-state.cc b/model/sync-full-state.cc
index 748e408..d01460e 100644
--- a/model/sync-full-state.cc
+++ b/model/sync-full-state.cc
@@ -64,18 +64,24 @@
 {
   if (m_digest == 0)
     {
-      // std::cout << "getDigest: ";
       m_digest = make_shared<Digest> ();
-      BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+      if (m_leaves.get<ordered> ().size () > 0)
         {
-          FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
-          BOOST_ASSERT (fullLeaf != 0);
-          *m_digest << fullLeaf->getDigest ();
-          // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+          BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+            {
+              FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
+              BOOST_ASSERT (fullLeaf != 0);
+              *m_digest << fullLeaf->getDigest ();
+              // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+            }
+          m_digest->finalize ();
         }
-      // std::cout << "\n";
+      else
+        {
+          std::istringstream is ("00"); //zero state
+          is >> *m_digest;
+        }
     }
-  m_digest->finalize ();
 
   return m_digest;
 }
diff --git a/model/sync-logic-event-container.h b/model/sync-logic-event-container.h
index 3b99d76..ade2c63 100644
--- a/model/sync-logic-event-container.h
+++ b/model/sync-logic-event-container.h
@@ -44,15 +44,21 @@
 
 struct LogicEvent
 {
-  LogicEvent (const boost::system_time &_time, Event _event)
+  LogicEvent (const boost::system_time &_time, Event _event, uint32_t _label)
     : time (_time)
     , event (_event)
+    , lbl (_label)
   { }
   
   boost::system_time time;
   Event event;
+  uint32_t lbl;
 };
 
+/// @cond include_hidden
+struct byLabel { } ;
+/// @endcond
+
 /**
  * \ingroup sync
  * @brief ???
@@ -63,6 +69,11 @@
 
     mi::ordered_non_unique<
       mi::member<LogicEvent, boost::system_time, &LogicEvent::time>
+      >,
+
+    mi::ordered_non_unique<
+      mi::tag<byLabel>,
+      mi::member<LogicEvent, uint32_t, &LogicEvent::lbl>
       >
     >
   >
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 7235220..4a243a4 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -48,6 +48,11 @@
 {
   m_ccnxHandle->setInterestFilter (syncPrefix,
                                    bind (&SyncLogic::respondSyncInterest, this, _1));
+
+  sendSyncInterest ();
+  m_scheduler.schedule (posix_time::seconds (4),
+                        bind (&SyncLogic::sendSyncInterest, this),
+                        REEXPRESSING_INTEREST);
 }
 
 SyncLogic::~SyncLogic ()
@@ -98,8 +103,9 @@
 
   if (!timedProcessing)
     {
-      m_delayedChecksScheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
-                                         bind (&SyncLogic::processSyncInterest, this, digest, interestName, true));
+      m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
+                            bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
+                            DELAYED_INTEREST_PROCESSING);
       
     }
   else
@@ -203,19 +209,27 @@
 }
 
 void
-SyncLogic::processPendingSyncInterests(DiffStatePtr &diff) 
+SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog) 
 {
   recursive_mutex::scoped_lock lock (m_stateMutex);
-  diff->setDigest(m_state.getDigest());
-  m_log.insert(diff);
+
+  diffLog->setDigest(m_state.getDigest());
+  if (m_log.size () > 0)
+    {
+      m_log.get<sequenced> ().front ()->setNext (diffLog);
+    }
+  m_log.insert (diffLog);
 
   vector<string> pis = m_syncInterestTable.fetchAll ();
-  stringstream ss;
-  ss << *diff;
-  for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
-  {
-    m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
-  }
+  if (pis.size () > 0)
+    {
+      stringstream ss;
+      ss << *diffLog;
+      for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
+        {
+          m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+        }
+    }
 }
 
 void
@@ -255,6 +269,11 @@
 
   m_ccnxHandle->sendInterest (os.str (),
                               bind (&SyncLogic::processSyncData, this, _1, _2));
+
+  m_scheduler.cancel (REEXPRESSING_INTEREST);
+  m_scheduler.schedule (posix_time::seconds (4),
+                        bind (&SyncLogic::sendSyncInterest, this),
+                        REEXPRESSING_INTEREST);
 }
 
 }
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 4270181..03c129a 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -90,7 +90,7 @@
 
 #ifdef _DEBUG
   Scheduler &
-  getScheduler () { return m_delayedChecksScheduler; }
+  getScheduler () { return m_scheduler; }
 #endif
   
 private:
@@ -118,12 +118,18 @@
   LogicRemoveCallback m_onRemove;
   CcnxWrapperPtr m_ccnxHandle;
 
-  Scheduler m_delayedChecksScheduler;
+  Scheduler m_scheduler;
 
   boost::mt19937 m_randomGenerator;
   boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
   
   static const int m_syncResponseFreshness = 2;
+
+  enum EventLabels
+    {
+      DELAYED_INTEREST_PROCESSING = 1,
+      REEXPRESSING_INTEREST = 2
+    };
 };
 
 
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index 155e839..c5f1ca0 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -70,7 +70,10 @@
 	      // sleeping
 
 	      if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
-		continue; // something changes, try again
+                {
+                  // cout << "expected here" << endl;
+                  continue; // something changes, try again
+                }
 	    }
 
 	  if (!m_threadRunning) continue;
@@ -80,7 +83,11 @@
 	  {
 	    lock_guard<mutex> lock (m_eventsMutex);
 
-	    BOOST_ASSERT (m_events.size () != 0);
+	    if (m_events.size () == 0)
+              {
+                // cout << "Here" << endl;
+                continue;
+              }
 	    
 	    event = m_events.begin ()->event;
 	    m_events.erase (m_events.begin ());
@@ -88,7 +95,7 @@
 
 	  event (); // calling the event outside the locked mutex
         }
-      catch (thread_interrupted e)
+      catch (thread_interrupted &e)
         {
           // cout << "interrupted: " << this_thread::get_id () << endl;
           // do nothing
@@ -99,21 +106,35 @@
 
 
 void
-Scheduler::schedule (const boost::system_time &abstime, Event event)
+Scheduler::schedule (const boost::system_time &abstime, Event event, uint32_t label)
 {
   {
     lock_guard<mutex> lock (m_eventsMutex);
-    m_events.insert (LogicEvent (abstime, event));
+    m_events.insert (LogicEvent (abstime, event, label));
   }
   m_eventsCondition.notify_one ();
   m_thread.interrupt (); // interrupt sleep, if currently sleeping
 }
 
 void
-Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event)
+Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label)
 {
   // cout << reltime << endl;
-  schedule (boost::get_system_time () + reltime, event);
+  schedule (boost::get_system_time () + reltime, event, label);
 }
 
+void
+Scheduler::cancel (uint32_t label)
+{
+  {
+    // cout << "Canceling label " << label << " size: " << m_events.size () << endl;
+    lock_guard<mutex> lock (m_eventsMutex);
+    m_events.get<byLabel> ().erase (label);
+    // cout << "Canceled label " << label << " size: " << m_events.size () << endl;
+  }
+  m_eventsCondition.notify_one ();
+  m_thread.interrupt (); // interrupt sleep, if currently sleeping
+}
+
+
 } // Sync
diff --git a/model/sync-scheduler.h b/model/sync-scheduler.h
index 8c48f86..ac1eef9 100644
--- a/model/sync-scheduler.h
+++ b/model/sync-scheduler.h
@@ -52,18 +52,26 @@
    * @brief Schedule an event at absolute time 'abstime'
    * @param abstime Absolute time
    * @param event function to be called at the time
+   * @param label Label for the event
    */
   void
-  schedule (const boost::system_time &abstime, Event event);
+  schedule (const boost::system_time &abstime, Event event, uint32_t label);
 
   /**
    * @brief Schedule an event at relative time 'reltime'
    * @param reltime Relative time
    * @param event function to be called at the time
+   * @param label Label for the event
    */
   void
-  schedule (const boost::posix_time::time_duration &reltime, Event event); 
+  schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label); 
 
+  /**
+   * @brief Cancel all events for the label
+   * @param label Label of the event that needs to be cancelled
+   */
+  void
+  cancel (uint32_t label);
 
 #ifdef _DEBUG
   size_t
diff --git a/test/test_scheduler.cc b/test/test_scheduler.cc
index 9a72251..1dce279 100644
--- a/test/test_scheduler.cc
+++ b/test/test_scheduler.cc
@@ -45,6 +45,12 @@
 //   cout << "funcRemove\n";
 // }
 
+enum SCHEDULE_LABELS
+  {
+    TEST_LABEL,
+    ANOTHER_LABEL
+  };
+
 struct SchedulerFixture
 {
   SchedulerFixture ()
@@ -61,7 +67,9 @@
     counter ++;
 
     if (counter < 9)
-    scheduler->schedule (boost::posix_time::milliseconds (100), boost::bind (&SchedulerFixture::everySecond, this));
+    scheduler->schedule (boost::posix_time::milliseconds (100),
+                         boost::bind (&SchedulerFixture::everySecond, this),
+                         TEST_LABEL);
   }
 
   void setCounterFive ()
@@ -84,28 +92,40 @@
 {
   BOOST_CHECK_NO_THROW (scheduler = new Scheduler ());
 
-  scheduler->schedule (posix_time::milliseconds (100), bind (&SchedulerFixture::everySecond, this));
+  scheduler->schedule (posix_time::milliseconds (100),
+                       bind (&SchedulerFixture::everySecond, this),
+                       TEST_LABEL);
 
   sleep (1);
   // cout << counter << endl;
   BOOST_CHECK_EQUAL (counter, 9); // generally, should be 9
 
   scheduler->schedule (posix_time::seconds (2),
-		       bind (&SchedulerFixture::setCounterFive, this));
+		       bind (&SchedulerFixture::setCounterFive, this),
+                       TEST_LABEL);
 
   this_thread::sleep (posix_time::milliseconds (400)); // just in case
 
   scheduler->schedule (posix_time::milliseconds (600),
-		       bind (&SchedulerFixture::setCounterThree, this));
+		       bind (&SchedulerFixture::setCounterThree, this),
+                       TEST_LABEL);
 
   this_thread::sleep (posix_time::milliseconds (500));
   BOOST_CHECK_EQUAL (counter, 9); // still 9
-
+  
   this_thread::sleep (posix_time::milliseconds (200));
   BOOST_CHECK_EQUAL (counter, 3);
 
   this_thread::sleep (posix_time::milliseconds (1000));
   BOOST_CHECK_EQUAL (counter, 5);
+
+  scheduler->schedule (posix_time::milliseconds (100),
+		       bind (&SchedulerFixture::setCounterThree, this),
+                       ANOTHER_LABEL);
+  this_thread::sleep (posix_time::milliseconds (50));
+  scheduler->cancel (ANOTHER_LABEL);
+  this_thread::sleep (posix_time::milliseconds (150));
+  BOOST_CHECK_EQUAL (counter, 5);
   
   BOOST_CHECK_NO_THROW (delete scheduler);
 }
@@ -121,7 +141,7 @@
 {
 }
 
-BOOST_AUTO_TEST_CASE (SyncLogicTest)
+BOOST_AUTO_TEST_CASE (SyncLogicSchedulerTest)
 {  
   SyncLogic *logic = 0;
   BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove, make_shared<CcnxWrapper> ()));
diff --git a/test/test_sync_logic.cc b/test/test_sync_logic.cc
new file mode 100644
index 0000000..27cd9c6
--- /dev/null
+++ b/test/test_sync_logic.cc
@@ -0,0 +1,70 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ *         卞超轶 Chaoyi Bian <bcy@pku.edu.cn>
+ *	   Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/output_test_stream.hpp> 
+#include <map>
+using boost::test_tools::output_test_stream;
+
+#include <boost/make_shared.hpp>
+
+#include "../model/sync-ccnx-wrapper.h"
+#include "../model/sync-logic.h"
+#include "../model/sync-seq-no.h"
+
+using namespace std;
+using namespace boost;
+using namespace Sync;
+
+struct Handler
+{
+  string instance;
+  
+  Handler (const string &_instance)
+  : instance (_instance)
+  {
+  }
+  
+  void onUpdate (const string &p/*prefix*/, const SeqNo &seq/*newSeq*/, const SeqNo &/*oldSeq*/)
+  {
+    cout << instance << "\t" << p << ": " << seq << endl;
+  }
+
+  void onRemove (const string &p/*prefix*/)
+  {
+    cout << instance << "\t" << p << endl;
+  }
+};
+
+BOOST_AUTO_TEST_CASE (SyncLogicTest)
+{
+  Handler h1 ("1"), h2 ("2");
+
+  SyncLogic l1 ("/bcast", bind (&Handler::onUpdate, &h1, _1, _2, _3), bind (&Handler::onRemove, &h1, _1), make_shared<CcnxWrapper> ());
+  SyncLogic L2 ("/bcast", bind (&Handler::onUpdate, &h2, _1, _2, _3), bind (&Handler::onRemove, &h2, _1), make_shared<CcnxWrapper> ());
+
+  sleep (10);
+  // l1.  
+}
+
+
+