Many improvements for the event scheduler
Another big change: maintaining an outstanding interests and
re-expressing this interest every 4 seconds (or 4 seconds after interest
is satisfied and expressed again)
Initial testing for SyncLogic
diff --git a/model/sync-full-state.cc b/model/sync-full-state.cc
index 748e408..d01460e 100644
--- a/model/sync-full-state.cc
+++ b/model/sync-full-state.cc
@@ -64,18 +64,24 @@
{
if (m_digest == 0)
{
- // std::cout << "getDigest: ";
m_digest = make_shared<Digest> ();
- BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ if (m_leaves.get<ordered> ().size () > 0)
{
- FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
- BOOST_ASSERT (fullLeaf != 0);
- *m_digest << fullLeaf->getDigest ();
- // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ {
+ FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
+ BOOST_ASSERT (fullLeaf != 0);
+ *m_digest << fullLeaf->getDigest ();
+ // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ }
+ m_digest->finalize ();
}
- // std::cout << "\n";
+ else
+ {
+ std::istringstream is ("00"); //zero state
+ is >> *m_digest;
+ }
}
- m_digest->finalize ();
return m_digest;
}
diff --git a/model/sync-logic-event-container.h b/model/sync-logic-event-container.h
index 3b99d76..ade2c63 100644
--- a/model/sync-logic-event-container.h
+++ b/model/sync-logic-event-container.h
@@ -44,15 +44,21 @@
struct LogicEvent
{
- LogicEvent (const boost::system_time &_time, Event _event)
+ LogicEvent (const boost::system_time &_time, Event _event, uint32_t _label)
: time (_time)
, event (_event)
+ , lbl (_label)
{ }
boost::system_time time;
Event event;
+ uint32_t lbl;
};
+/// @cond include_hidden
+struct byLabel { } ;
+/// @endcond
+
/**
* \ingroup sync
* @brief ???
@@ -63,6 +69,11 @@
mi::ordered_non_unique<
mi::member<LogicEvent, boost::system_time, &LogicEvent::time>
+ >,
+
+ mi::ordered_non_unique<
+ mi::tag<byLabel>,
+ mi::member<LogicEvent, uint32_t, &LogicEvent::lbl>
>
>
>
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 7235220..4a243a4 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -48,6 +48,11 @@
{
m_ccnxHandle->setInterestFilter (syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
+
+ sendSyncInterest ();
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
SyncLogic::~SyncLogic ()
@@ -98,8 +103,9 @@
if (!timedProcessing)
{
- m_delayedChecksScheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
- bind (&SyncLogic::processSyncInterest, this, digest, interestName, true));
+ m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
+ bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
+ DELAYED_INTEREST_PROCESSING);
}
else
@@ -203,19 +209,27 @@
}
void
-SyncLogic::processPendingSyncInterests(DiffStatePtr &diff)
+SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog)
{
recursive_mutex::scoped_lock lock (m_stateMutex);
- diff->setDigest(m_state.getDigest());
- m_log.insert(diff);
+
+ diffLog->setDigest(m_state.getDigest());
+ if (m_log.size () > 0)
+ {
+ m_log.get<sequenced> ().front ()->setNext (diffLog);
+ }
+ m_log.insert (diffLog);
vector<string> pis = m_syncInterestTable.fetchAll ();
- stringstream ss;
- ss << *diff;
- for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
- {
- m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
- }
+ if (pis.size () > 0)
+ {
+ stringstream ss;
+ ss << *diffLog;
+ for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
+ {
+ m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+ }
+ }
}
void
@@ -255,6 +269,11 @@
m_ccnxHandle->sendInterest (os.str (),
bind (&SyncLogic::processSyncData, this, _1, _2));
+
+ m_scheduler.cancel (REEXPRESSING_INTEREST);
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
}
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 4270181..03c129a 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -90,7 +90,7 @@
#ifdef _DEBUG
Scheduler &
- getScheduler () { return m_delayedChecksScheduler; }
+ getScheduler () { return m_scheduler; }
#endif
private:
@@ -118,12 +118,18 @@
LogicRemoveCallback m_onRemove;
CcnxWrapperPtr m_ccnxHandle;
- Scheduler m_delayedChecksScheduler;
+ Scheduler m_scheduler;
boost::mt19937 m_randomGenerator;
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
static const int m_syncResponseFreshness = 2;
+
+ enum EventLabels
+ {
+ DELAYED_INTEREST_PROCESSING = 1,
+ REEXPRESSING_INTEREST = 2
+ };
};
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index 155e839..c5f1ca0 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -70,7 +70,10 @@
// sleeping
if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
- continue; // something changes, try again
+ {
+ // cout << "expected here" << endl;
+ continue; // something changes, try again
+ }
}
if (!m_threadRunning) continue;
@@ -80,7 +83,11 @@
{
lock_guard<mutex> lock (m_eventsMutex);
- BOOST_ASSERT (m_events.size () != 0);
+ if (m_events.size () == 0)
+ {
+ // cout << "Here" << endl;
+ continue;
+ }
event = m_events.begin ()->event;
m_events.erase (m_events.begin ());
@@ -88,7 +95,7 @@
event (); // calling the event outside the locked mutex
}
- catch (thread_interrupted e)
+ catch (thread_interrupted &e)
{
// cout << "interrupted: " << this_thread::get_id () << endl;
// do nothing
@@ -99,21 +106,35 @@
void
-Scheduler::schedule (const boost::system_time &abstime, Event event)
+Scheduler::schedule (const boost::system_time &abstime, Event event, uint32_t label)
{
{
lock_guard<mutex> lock (m_eventsMutex);
- m_events.insert (LogicEvent (abstime, event));
+ m_events.insert (LogicEvent (abstime, event, label));
}
m_eventsCondition.notify_one ();
m_thread.interrupt (); // interrupt sleep, if currently sleeping
}
void
-Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event)
+Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label)
{
// cout << reltime << endl;
- schedule (boost::get_system_time () + reltime, event);
+ schedule (boost::get_system_time () + reltime, event, label);
}
+void
+Scheduler::cancel (uint32_t label)
+{
+ {
+ // cout << "Canceling label " << label << " size: " << m_events.size () << endl;
+ lock_guard<mutex> lock (m_eventsMutex);
+ m_events.get<byLabel> ().erase (label);
+ // cout << "Canceled label " << label << " size: " << m_events.size () << endl;
+ }
+ m_eventsCondition.notify_one ();
+ m_thread.interrupt (); // interrupt sleep, if currently sleeping
+}
+
+
} // Sync
diff --git a/model/sync-scheduler.h b/model/sync-scheduler.h
index 8c48f86..ac1eef9 100644
--- a/model/sync-scheduler.h
+++ b/model/sync-scheduler.h
@@ -52,18 +52,26 @@
* @brief Schedule an event at absolute time 'abstime'
* @param abstime Absolute time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::system_time &abstime, Event event);
+ schedule (const boost::system_time &abstime, Event event, uint32_t label);
/**
* @brief Schedule an event at relative time 'reltime'
* @param reltime Relative time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::posix_time::time_duration &reltime, Event event);
+ schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label);
+ /**
+ * @brief Cancel all events for the label
+ * @param label Label of the event that needs to be cancelled
+ */
+ void
+ cancel (uint32_t label);
#ifdef _DEBUG
size_t