Merge remote-tracking branch 'git.irl/master'
All cout's are commented out
diff --git a/model/sync-full-state.cc b/model/sync-full-state.cc
index 748e408..d01460e 100644
--- a/model/sync-full-state.cc
+++ b/model/sync-full-state.cc
@@ -64,18 +64,24 @@
{
if (m_digest == 0)
{
- // std::cout << "getDigest: ";
m_digest = make_shared<Digest> ();
- BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ if (m_leaves.get<ordered> ().size () > 0)
{
- FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
- BOOST_ASSERT (fullLeaf != 0);
- *m_digest << fullLeaf->getDigest ();
- // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ BOOST_FOREACH (LeafConstPtr leaf, m_leaves.get<ordered> ())
+ {
+ FullLeafConstPtr fullLeaf = dynamic_pointer_cast<const FullLeaf> (leaf);
+ BOOST_ASSERT (fullLeaf != 0);
+ *m_digest << fullLeaf->getDigest ();
+ // std::cout << *leaf << "[" << fullLeaf->getDigest () << "] ";
+ }
+ m_digest->finalize ();
}
- // std::cout << "\n";
+ else
+ {
+ std::istringstream is ("00"); //zero state
+ is >> *m_digest;
+ }
}
- m_digest->finalize ();
return m_digest;
}
diff --git a/model/sync-logic-event-container.h b/model/sync-logic-event-container.h
index 3b99d76..ade2c63 100644
--- a/model/sync-logic-event-container.h
+++ b/model/sync-logic-event-container.h
@@ -44,15 +44,21 @@
struct LogicEvent
{
- LogicEvent (const boost::system_time &_time, Event _event)
+ LogicEvent (const boost::system_time &_time, Event _event, uint32_t _label)
: time (_time)
, event (_event)
+ , lbl (_label)
{ }
boost::system_time time;
Event event;
+ uint32_t lbl;
};
+/// @cond include_hidden
+struct byLabel { } ;
+/// @endcond
+
/**
* \ingroup sync
* @brief ???
@@ -63,6 +69,11 @@
mi::ordered_non_unique<
mi::member<LogicEvent, boost::system_time, &LogicEvent::time>
+ >,
+
+ mi::ordered_non_unique<
+ mi::tag<byLabel>,
+ mi::member<LogicEvent, uint32_t, &LogicEvent::lbl>
>
>
>
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index c143498..c8452dc 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -40,21 +40,18 @@
LogicRemoveCallback onRemove)
: m_syncPrefix (syncPrefix)
, m_onUpdate (onUpdate)
- , m_ccnxHandle(new CcnxWrapper())
, m_onRemove (onRemove)
+ , m_ccnxHandle(new CcnxWrapper())
, m_randomGenerator (static_cast<unsigned int> (std::time (0)))
, m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,100))
{
m_ccnxHandle->setInterestFilter (syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
-
-}
-
-void SyncLogic::start()
-{
- // We need to send out our Sync Interest when we're ready
- sendSyncInterest();
+ sendSyncInterest ();
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
SyncLogic::~SyncLogic ()
@@ -65,7 +62,7 @@
void
SyncLogic::respondSyncInterest (const string &interest)
{
- cout << "Respond Sync Interest" << endl;
+ // cout << "Respond Sync Interest" << endl;
string hash = interest.substr(interest.find_last_of("/") + 1);
DigestPtr digest = make_shared<Digest> ();
try
@@ -106,8 +103,9 @@
if (!timedProcessing)
{
- m_delayedChecksScheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
- bind (&SyncLogic::processSyncInterest, this, digest, interestName, true));
+ m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
+ bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
+ DELAYED_INTEREST_PROCESSING);
}
else
@@ -121,7 +119,7 @@
void
SyncLogic::processSyncData (const string &name, const string &dataBuffer)
{
- cout << "Process Sync Data" <<endl;
+ // cout << "Process Sync Data" <<endl;
DiffStatePtr diffLog = make_shared<DiffState> ();
try
@@ -205,34 +203,41 @@
return;
}
- // Zhenkai: shouldn't we all ways send a new Sync Interest?
- //if (diffLog->getLeaves ().size () > 0)
+ if (diffLog->getLeaves ().size () > 0)
{
sendSyncInterest();
}
}
void
-SyncLogic::processPendingSyncInterests(DiffStatePtr &diff)
+SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog)
{
- //cout << "Process Pending Interests" <<endl;
+ //cout << "Process Pending Interests" <<endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
- diff->setDigest(m_state.getDigest());
- m_log.insert(diff);
+
+ diffLog->setDigest(m_state.getDigest());
+ if (m_log.size () > 0)
+ {
+ m_log.get<sequenced> ().front ()->setNext (diffLog);
+ }
+ m_log.insert (diffLog);
vector<string> pis = m_syncInterestTable.fetchAll ();
+ if (pis.size () > 0)
+ {
stringstream ss;
- ss << *diff;
+ ss << *diffLog;
for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
{
m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
}
}
+}
void
SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
{
- //cout << "Add local names" <<endl;
+ //cout << "Add local names" <<endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
SeqNo seqN(session, seq);
@@ -260,14 +265,19 @@
void
SyncLogic::sendSyncInterest ()
{
- cout << "Sending Sync Interest" << endl;
+ // cout << "Sending Sync Interest" << endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
ostringstream os;
- os << m_syncPrefix << "/" << m_state.getDigest();
+ os << m_syncPrefix << "/" << *m_state.getDigest();
m_ccnxHandle->sendInterest (os.str (),
bind (&SyncLogic::processSyncData, this, _1, _2));
+
+ m_scheduler.cancel (REEXPRESSING_INTEREST);
+ m_scheduler.schedule (posix_time::seconds (4),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
}
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 780c515..da0a7fa 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -95,7 +95,7 @@
#ifdef _DEBUG
Scheduler &
- getScheduler () { return m_delayedChecksScheduler; }
+ getScheduler () { return m_scheduler; }
#endif
private:
@@ -123,12 +123,18 @@
LogicRemoveCallback m_onRemove;
std::auto_ptr<CcnxWrapper> m_ccnxHandle;
- Scheduler m_delayedChecksScheduler;
+ Scheduler m_scheduler;
boost::mt19937 m_randomGenerator;
boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
static const int m_syncResponseFreshness = 2;
+
+ enum EventLabels
+ {
+ DELAYED_INTEREST_PROCESSING = 1,
+ REEXPRESSING_INTEREST = 2
+ };
};
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index 155e839..c5f1ca0 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -70,7 +70,10 @@
// sleeping
if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
- continue; // something changes, try again
+ {
+ // cout << "expected here" << endl;
+ continue; // something changes, try again
+ }
}
if (!m_threadRunning) continue;
@@ -80,7 +83,11 @@
{
lock_guard<mutex> lock (m_eventsMutex);
- BOOST_ASSERT (m_events.size () != 0);
+ if (m_events.size () == 0)
+ {
+ // cout << "Here" << endl;
+ continue;
+ }
event = m_events.begin ()->event;
m_events.erase (m_events.begin ());
@@ -88,7 +95,7 @@
event (); // calling the event outside the locked mutex
}
- catch (thread_interrupted e)
+ catch (thread_interrupted &e)
{
// cout << "interrupted: " << this_thread::get_id () << endl;
// do nothing
@@ -99,21 +106,35 @@
void
-Scheduler::schedule (const boost::system_time &abstime, Event event)
+Scheduler::schedule (const boost::system_time &abstime, Event event, uint32_t label)
{
{
lock_guard<mutex> lock (m_eventsMutex);
- m_events.insert (LogicEvent (abstime, event));
+ m_events.insert (LogicEvent (abstime, event, label));
}
m_eventsCondition.notify_one ();
m_thread.interrupt (); // interrupt sleep, if currently sleeping
}
void
-Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event)
+Scheduler::schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label)
{
// cout << reltime << endl;
- schedule (boost::get_system_time () + reltime, event);
+ schedule (boost::get_system_time () + reltime, event, label);
}
+void
+Scheduler::cancel (uint32_t label)
+{
+ {
+ // cout << "Canceling label " << label << " size: " << m_events.size () << endl;
+ lock_guard<mutex> lock (m_eventsMutex);
+ m_events.get<byLabel> ().erase (label);
+ // cout << "Canceled label " << label << " size: " << m_events.size () << endl;
+ }
+ m_eventsCondition.notify_one ();
+ m_thread.interrupt (); // interrupt sleep, if currently sleeping
+}
+
+
} // Sync
diff --git a/model/sync-scheduler.h b/model/sync-scheduler.h
index 8c48f86..ac1eef9 100644
--- a/model/sync-scheduler.h
+++ b/model/sync-scheduler.h
@@ -52,18 +52,26 @@
* @brief Schedule an event at absolute time 'abstime'
* @param abstime Absolute time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::system_time &abstime, Event event);
+ schedule (const boost::system_time &abstime, Event event, uint32_t label);
/**
* @brief Schedule an event at relative time 'reltime'
* @param reltime Relative time
* @param event function to be called at the time
+ * @param label Label for the event
*/
void
- schedule (const boost::posix_time::time_duration &reltime, Event event);
+ schedule (const boost::posix_time::time_duration &reltime, Event event, uint32_t label);
+ /**
+ * @brief Cancel all events for the label
+ * @param label Label of the event that needs to be cancelled
+ */
+ void
+ cancel (uint32_t label);
#ifdef _DEBUG
size_t
diff --git a/test/test_scheduler.cc b/test/test_scheduler.cc
index 2871d95..821155c 100644
--- a/test/test_scheduler.cc
+++ b/test/test_scheduler.cc
@@ -45,6 +45,12 @@
// cout << "funcRemove\n";
// }
+enum SCHEDULE_LABELS
+ {
+ TEST_LABEL,
+ ANOTHER_LABEL
+ };
+
struct SchedulerFixture
{
SchedulerFixture ()
@@ -61,7 +67,9 @@
counter ++;
if (counter < 9)
- scheduler->schedule (boost::posix_time::milliseconds (100), boost::bind (&SchedulerFixture::everySecond, this));
+ scheduler->schedule (boost::posix_time::milliseconds (100),
+ boost::bind (&SchedulerFixture::everySecond, this),
+ TEST_LABEL);
}
void setCounterFive ()
@@ -84,28 +92,40 @@
{
BOOST_CHECK_NO_THROW (scheduler = new Scheduler ());
- scheduler->schedule (posix_time::milliseconds (100), bind (&SchedulerFixture::everySecond, this));
+ scheduler->schedule (posix_time::milliseconds (100),
+ bind (&SchedulerFixture::everySecond, this),
+ TEST_LABEL);
sleep (1);
// cout << counter << endl;
BOOST_CHECK_EQUAL (counter, 9); // generally, should be 9
scheduler->schedule (posix_time::seconds (2),
- bind (&SchedulerFixture::setCounterFive, this));
+ bind (&SchedulerFixture::setCounterFive, this),
+ TEST_LABEL);
this_thread::sleep (posix_time::milliseconds (400)); // just in case
scheduler->schedule (posix_time::milliseconds (600),
- bind (&SchedulerFixture::setCounterThree, this));
+ bind (&SchedulerFixture::setCounterThree, this),
+ TEST_LABEL);
this_thread::sleep (posix_time::milliseconds (500));
BOOST_CHECK_EQUAL (counter, 9); // still 9
-
+
this_thread::sleep (posix_time::milliseconds (200));
BOOST_CHECK_EQUAL (counter, 3);
this_thread::sleep (posix_time::milliseconds (1000));
BOOST_CHECK_EQUAL (counter, 5);
+
+ scheduler->schedule (posix_time::milliseconds (100),
+ bind (&SchedulerFixture::setCounterThree, this),
+ ANOTHER_LABEL);
+ this_thread::sleep (posix_time::milliseconds (50));
+ scheduler->cancel (ANOTHER_LABEL);
+ this_thread::sleep (posix_time::milliseconds (150));
+ BOOST_CHECK_EQUAL (counter, 5);
BOOST_CHECK_NO_THROW (delete scheduler);
}
@@ -121,7 +141,7 @@
{
}
-BOOST_AUTO_TEST_CASE (SyncLogicTest)
+BOOST_AUTO_TEST_CASE (SyncLogicSchedulerTest)
{
SyncLogic *logic = 0;
BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove));
diff --git a/test/test_sync_logic.cc b/test/test_sync_logic.cc
new file mode 100644
index 0000000..27cd9c6
--- /dev/null
+++ b/test/test_sync_logic.cc
@@ -0,0 +1,70 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * 卞超轶 Chaoyi Bian <bcy@pku.edu.cn>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/output_test_stream.hpp>
+#include <map>
+using boost::test_tools::output_test_stream;
+
+#include <boost/make_shared.hpp>
+
+#include "../model/sync-ccnx-wrapper.h"
+#include "../model/sync-logic.h"
+#include "../model/sync-seq-no.h"
+
+using namespace std;
+using namespace boost;
+using namespace Sync;
+
+struct Handler
+{
+ string instance;
+
+ Handler (const string &_instance)
+ : instance (_instance)
+ {
+ }
+
+ void onUpdate (const string &p/*prefix*/, const SeqNo &seq/*newSeq*/, const SeqNo &/*oldSeq*/)
+ {
+ cout << instance << "\t" << p << ": " << seq << endl;
+ }
+
+ void onRemove (const string &p/*prefix*/)
+ {
+ cout << instance << "\t" << p << endl;
+ }
+};
+
+BOOST_AUTO_TEST_CASE (SyncLogicTest)
+{
+ Handler h1 ("1"), h2 ("2");
+
+ SyncLogic l1 ("/bcast", bind (&Handler::onUpdate, &h1, _1, _2, _3), bind (&Handler::onRemove, &h1, _1), make_shared<CcnxWrapper> ());
+ SyncLogic L2 ("/bcast", bind (&Handler::onUpdate, &h2, _1, _2, _3), bind (&Handler::onRemove, &h2, _1), make_shared<CcnxWrapper> ());
+
+ sleep (10);
+ // l1.
+}
+
+
+