Many improvements for the event scheduler
Another big change: maintaining an outstanding interests and
re-expressing this interest every 4 seconds (or 4 seconds after interest
is satisfied and expressed again)
Initial testing for SyncLogic
diff --git a/model/sync-full-state.cc b/model/sync-full-state.cc
index 748e408..d01460e 100644
--- a/model/sync-full-state.cc
+++ b/model/sync-full-state.cc
@@ -64,18 +64,24 @@
{
if (m_digest == 0)
{
- // std::cout << "getDigest: ";
m_digest = make_shared<Digest> ();
- BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ if (m_leaves.get<ordered> ().size () > 0)
{
- FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
- BOOST_ASSERT (fullLeaf != 0);
- *m_digest << fullLeaf->getDigest ();
- // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ {
+ FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
+ BOOST_ASSERT (fullLeaf != 0);
+ *m_digest << fullLeaf->getDigest ();
+ // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ }
+ m_digest->finalize ();
}
- // std::cout << "\n";
+ else
+ {
+ std::istringstream is ("00"); //zero state
+ is >> *m_digest;
+ }
}
- m_digest->finalize ();
return m_digest;
}
diff --git a/model/sync-logic-event-container.h b/model/sync-logic-event-container.h
index 3b99d76..ade2c63 100644
--- a/model/sync-logic-event-container.h
+++ b/model/sync-logic-event-container.h
@@ -44,15 +44,21 @@
struct LogicEvent
{
- LogicEvent (const boost::system_time &_time, Event _event)
+ LogicEvent (const boost::system_time &_time, Event _event, uint32_t _label)
: time (_time)
, event (_event)
+ , lbl (_label)
{ }
boost::system_time time;
Event event;
+ uint32_t lbl;
};
+/// @cond include_hidden
+struct byLabel { } ;
+/// @endcond
+
/**
* \ingroup sync
* @brief ???
@@ -63,6 +69,11 @@
mi::ordered_non_unique<
mi::member<LogicEvent, boost::system_time, &LogicEvent::time>
+ >,
+
+ mi::ordered_non_unique<
+ mi::tag<byLabel>,
+ mi::member<LogicEvent, uint32_t, &LogicEvent::lbl>
>
>
>
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 7235220..4a243a4 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -48,6 +48,11 @@
{
m_ccnxHandle->setInterestFilter (syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
+
+ sendSyncInterest ();
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
SyncLogic::~SyncLogic ()
@@ -98,8 +103,9 @@
if (!timedProcessing)
{
- m_delayedChecksScheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
- bind (&SyncLogic::processSyncInterest, this, digest, interestName, true));
+ m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
+ bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
+ DELAYED_INTEREST_PROCESSING);
}
else
@@ -203,19 +209,27 @@
}
void
-SyncLogic::processPendingSyncInterests(DiffStatePtr &diff)
+SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog)
{
recursive_mutex::scoped_lock lock (m_stateMutex);
- diff->setDigest(m_state.getDigest());
- m_log.insert(diff);
+
+ diffLog->setDigest(m_state.getDigest());
+ if (m_log.size () > 0)
+ {
+ m_log.get<sequenced> ().front ()->setNext (diffLog);
+ }
+ m_log.insert (diffLog);
vector<string> pis = m_syncInterestTable.fetchAll ();
- stringstream ss;
- ss << *diff;
- for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
- {
- m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
- }
+ if (pis.size () > 0)
+ {
+ stringstream ss;
+ ss << *diffLog;
+ for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
+ {
+ m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+ }
+ }
}
void
@@ -255,6 +269,11 @@
m_ccnxHandle->sendInterest (os.str (),
bind (&SyncLogic::processSyncData, this, _1, _2));
+
+ m_scheduler.cancel (REEXPRESSING_INTEREST);
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
}
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 4270181..03c129a 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -90,7 +90,7 @@
#ifdef _DEBUG
Scheduler &
- getScheduler () { return m_delayedChecksScheduler; }
+ getScheduler () { return m_scheduler; }
#endif
private:
@@ -118,12 +118,18 @@
LogicRemoveCallback m_onRemove;
CcnxWrapperPtr m_ccnxHandle;
- Scheduler m_delayedChecksScheduler;
+ Scheduler m_scheduler;
boost::mt19937 m_randomGenerator;
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
static const int m_syncResponseFreshness = 2;
+
+ enum EventLabels
+ {
+ DELAYED_INTEREST_PROCESSING = 1,
+ REEXPRESSING_INTEREST = 2
+ };
};
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index 155e839..c5f1ca0 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -70,7 +70,10 @@
// sleeping
if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
- continue; // something changes, try again
+ {
+ // cout << "expected here" << endl;
+ continue; // something changes, try again
+ }
}
if (!m_threadRunning) continue;
@@ -80,7 +83,11 @@
{
lock_guard<mutex> lock (m_eventsMutex);
- BOOST_ASSERT (m_events.size () != 0);
+ if (m_events.size () == 0)
+ {
+ // cout << "Here" << endl;
+ continue;
+ }
event = m_events.begin ()->event;
m_events.erase (m_events.begin ());
@@ -88,7 +95,7 @@
event (); // calling the event outside the locked mutex
}
- catch (thread_interrupted e)
+ catch (thread_interrupted &e)
{
// cout << "interrupted: " << this_thread::get_id () << endl;
// do nothing
@@ -99,21 +106,35 @@
void
-Scheduler::schedule (const boost::system_time &abstime, Event event)
+Scheduler::schedule (const boost::system_time &abstime, Event event, uint32_t label)
{
{
lock_guard<mutex> lock (m_eventsMutex);
- m_events.insert (LogicEvent (abstime, event));
+ m_events.insert (LogicEvent (abstime, event, label));
}
m_eventsCondition.notify_one ();
m_thread.interrupt (); // interrupt sleep, if currently sleeping
}
void
-Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event)
+Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label)
{
// cout << reltime << endl;
- schedule (boost::get_system_time () + reltime, event);
+ schedule (boost::get_system_time () + reltime, event, label);
}
+void
+Scheduler::cancel (uint32_t label)
+{
+ {
+ // cout << "Canceling label " << label << " size: " << m_events.size () << endl;
+ lock_guard<mutex> lock (m_eventsMutex);
+ m_events.get<byLabel> ().erase (label);
+ // cout << "Canceled label " << label << " size: " << m_events.size () << endl;
+ }
+ m_eventsCondition.notify_one ();
+ m_thread.interrupt (); // interrupt sleep, if currently sleeping
+}
+
+
} // Sync
diff --git a/model/sync-scheduler.h b/model/sync-scheduler.h
index 8c48f86..ac1eef9 100644
--- a/model/sync-scheduler.h
+++ b/model/sync-scheduler.h
@@ -52,18 +52,26 @@
* @brief Schedule an event at absolute time 'abstime'
* @param abstime Absolute time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::system_time &abstime, Event event);
+ schedule (const boost::system_time &abstime, Event event, uint32_t label);
/**
* @brief Schedule an event at relative time 'reltime'
* @param reltime Relative time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::posix_time::time_duration &reltime, Event event);
+ schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label);
+ /**
+ * @brief Cancel all events for the label
+ * @param label Label of the event that needs to be cancelled
+ */
+ void
+ cancel (uint32_t label);
#ifdef _DEBUG
size_t
diff --git a/test/test_scheduler.cc b/test/test_scheduler.cc
index 9a72251..1dce279 100644
--- a/test/test_scheduler.cc
+++ b/test/test_scheduler.cc
@@ -45,6 +45,12 @@
// cout << "funcRemove\n";
// }
+enum SCHEDULE_LABELS
+ {
+ TEST_LABEL,
+ ANOTHER_LABEL
+ };
+
struct SchedulerFixture
{
SchedulerFixture ()
@@ -61,7 +67,9 @@
counter ++;
if (counter < 9)
- scheduler->schedule (boost::posix_time::milliseconds (100), boost::bind (&SchedulerFixture::everySecond, this));
+ scheduler->schedule (boost::posix_time::milliseconds (100),
+ boost::bind (&SchedulerFixture::everySecond, this),
+ TEST_LABEL);
}
void setCounterFive ()
@@ -84,28 +92,40 @@
{
BOOST_CHECK_NO_THROW (scheduler = new Scheduler ());
- scheduler->schedule (posix_time::milliseconds (100), bind (&SchedulerFixture::everySecond, this));
+ scheduler->schedule (posix_time::milliseconds (100),
+ bind (&SchedulerFixture::everySecond, this),
+ TEST_LABEL);
sleep (1);
// cout << counter << endl;
BOOST_CHECK_EQUAL (counter, 9); // generally, should be 9
scheduler->schedule (posix_time::seconds (2),
- bind (&SchedulerFixture::setCounterFive, this));
+ bind (&SchedulerFixture::setCounterFive, this),
+ TEST_LABEL);
this_thread::sleep (posix_time::milliseconds (400)); // just in case
scheduler->schedule (posix_time::milliseconds (600),
- bind (&SchedulerFixture::setCounterThree, this));
+ bind (&SchedulerFixture::setCounterThree, this),
+ TEST_LABEL);
this_thread::sleep (posix_time::milliseconds (500));
BOOST_CHECK_EQUAL (counter, 9); // still 9
-
+
this_thread::sleep (posix_time::milliseconds (200));
BOOST_CHECK_EQUAL (counter, 3);
this_thread::sleep (posix_time::milliseconds (1000));
BOOST_CHECK_EQUAL (counter, 5);
+
+ scheduler->schedule (posix_time::milliseconds (100),
+ bind (&SchedulerFixture::setCounterThree, this),
+ ANOTHER_LABEL);
+ this_thread::sleep (posix_time::milliseconds (50));
+ scheduler->cancel (ANOTHER_LABEL);
+ this_thread::sleep (posix_time::milliseconds (150));
+ BOOST_CHECK_EQUAL (counter, 5);
BOOST_CHECK_NO_THROW (delete scheduler);
}
@@ -121,7 +141,7 @@
{
}
-BOOST_AUTO_TEST_CASE (SyncLogicTest)
+BOOST_AUTO_TEST_CASE (SyncLogicSchedulerTest)
{
SyncLogic *logic = 0;
BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove, make_shared<CcnxWrapper> ()));
diff --git a/test/test_sync_logic.cc b/test/test_sync_logic.cc
new file mode 100644
index 0000000..27cd9c6
--- /dev/null
+++ b/test/test_sync_logic.cc
@@ -0,0 +1,70 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * 卞超轶 Chaoyi Bian <bcy@pku.edu.cn>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/output_test_stream.hpp>
+#include <map>
+using boost::test_tools::output_test_stream;
+
+#include <boost/make_shared.hpp>
+
+#include "../model/sync-ccnx-wrapper.h"
+#include "../model/sync-logic.h"
+#include "../model/sync-seq-no.h"
+
+using namespace std;
+using namespace boost;
+using namespace Sync;
+
+struct Handler
+{
+ string instance;
+
+ Handler (const string &_instance)
+ : instance (_instance)
+ {
+ }
+
+ void onUpdate (const string &p/*prefix*/, const SeqNo &seq/*newSeq*/, const SeqNo &/*oldSeq*/)
+ {
+ cout << instance << "\t" << p << ": " << seq << endl;
+ }
+
+ void onRemove (const string &p/*prefix*/)
+ {
+ cout << instance << "\t" << p << endl;
+ }
+};
+
+BOOST_AUTO_TEST_CASE (SyncLogicTest)
+{
+ Handler h1 ("1"), h2 ("2");
+
+ SyncLogic l1 ("/bcast", bind (&Handler::onUpdate, &h1, _1, _2, _3), bind (&Handler::onRemove, &h1, _1), make_shared<CcnxWrapper> ());
+ SyncLogic L2 ("/bcast", bind (&Handler::onUpdate, &h2, _1, _2, _3), bind (&Handler::onRemove, &h2, _1), make_shared<CcnxWrapper> ());
+
+ sleep (10);
+ // l1.
+}
+
+
+