Bug solving in SyncLogic
diff --git a/log4cxx.properties b/log4cxx.properties
index e89f3b0..70f885a 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -8,11 +8,13 @@
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.target=System.err
#log4j.appender.A1.layout.ConversionPattern=%d{dd-MMM HH:MM:SS,SSS} %p %c %m%n
-log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-10c{1} %m%n
+#log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
+log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS} %-12c %m%n
log4j.logger.SyncLogic = TRACE
-log4j.logger.AppDataFetch = TRACE
-log4j.logger.Test = TRACE
+#log4j.logger.SyncInterestTable = TRACE
+# log4j.logger.AppDataFetch = TRACE
+# log4j.logger.Test = TRACE
#log4j.logger.bgpparser=TRACE
#log4j.logger.bgpparser.AttributeType=ERROR
#log4j.logger.bgpparser.MRTCommonHeader=ERROR
diff --git a/model/sync-interest-table.cc b/model/sync-interest-table.cc
index b9f6484..3a08fbd 100644
--- a/model/sync-interest-table.cc
+++ b/model/sync-interest-table.cc
@@ -21,32 +21,29 @@
*/
#include "sync-interest-table.h"
-
+#include "sync-log.h"
using namespace std;
using namespace boost;
+INIT_LOGGER ("SyncInterestTable");
+
namespace Sync
{
SyncInterestTable::SyncInterestTable ()
- : m_running (true)
{
- m_thread = thread (&SyncInterestTable::periodicCheck, this);
+ m_scheduler.schedule (posix_time::seconds (m_checkPeriod),
+ bind (&SyncInterestTable::expireInterests, this),
+ 0);
}
SyncInterestTable::~SyncInterestTable ()
{
- // cout << "request interrupt: " << this_thread::get_id () << endl;
- m_running = false;
- m_thread.interrupt ();
- m_thread.join ();
}
vector<string>
SyncInterestTable::fetchAll ()
{
- expireInterests();
-
recursive_mutex::scoped_lock lock (m_mutex);
vector<string> entries;
@@ -62,7 +59,7 @@
}
bool
-SyncInterestTable::insert(string interest)
+SyncInterestTable::insert(const string &interest)
{
recursive_mutex::scoped_lock lock (m_mutex);
TableContainer::iterator it = m_table.find (interest);
@@ -72,41 +69,52 @@
m_table.insert (make_pair(interest, currentTime));
}
-void SyncInterestTable::expireInterests()
+uint32_t
+SyncInterestTable::size () const
{
recursive_mutex::scoped_lock lock (m_mutex);
+ return m_table.size ();
+}
+
+bool
+SyncInterestTable::remove (const std::string &interest)
+{
+ recursive_mutex::scoped_lock lock (m_mutex);
+ TableContainer::iterator item = m_table.find (interest);
+ if (item != m_table.end ())
+ {
+ m_table.erase (item);
+ return true;
+ }
+ return false;
+}
+
+
+void SyncInterestTable::expireInterests ()
+{
+ recursive_mutex::scoped_lock lock (m_mutex);
+
+ uint32_t count = 0;
time_t currentTime = time(0);
TableContainer::iterator it = m_table.begin ();
while (it != m_table.end())
{
time_t timestamp = it->second;
- if (currentTime - timestamp > m_checkPeriod) {
- it = m_table.erase(it);
- }
+ if (currentTime - timestamp > m_checkPeriod)
+ {
+ it = m_table.erase(it);
+ count ++;
+ }
else
++it;
}
+
+ _LOG_DEBUG ("expireInterests (): expired " << count);
+
+ m_scheduler.schedule (posix_time::seconds (m_checkPeriod),
+ bind (&SyncInterestTable::expireInterests, this),
+ 0);
}
-void SyncInterestTable::periodicCheck ()
-{
- while (m_running)
- {
- try
- {
- // cout << "enterSleep: " << this_thread::get_id () << endl;
-
- this_thread::sleep (posix_time::seconds(4));
- expireInterests ();
- }
- catch (boost::thread_interrupted e)
- {
- // should I just assign m_running = false here?
-
- // cout << "interrupted: " << this_thread::get_id () << endl;
- // do nothing
- }
- }
-}
}
diff --git a/model/sync-interest-table.h b/model/sync-interest-table.h
index bc5f2c2..199d731 100644
--- a/model/sync-interest-table.h
+++ b/model/sync-interest-table.h
@@ -29,6 +29,7 @@
#include <boost/thread/recursive_mutex.hpp>
#include <boost/thread/thread.hpp>
#include <ctime>
+#include "sync-scheduler.h"
namespace Sync {
@@ -48,7 +49,14 @@
* @brief Insert an interest, if interest already exists, update the
* timestamp
*/
- bool insert (std::string interest);
+ bool
+ insert (const std::string &interest);
+
+ /**
+ * @brief Remove interest (e.g., when it was satisfied)
+ */
+ bool
+ remove (const std::string &interest);
/**
* @brief fetch all Interests and clear the table
@@ -56,24 +64,24 @@
std::vector<std::string>
fetchAll ();
+ uint32_t
+ size () const;
+
private:
/**
* @brief periodically called to expire Interest
*/
- void expireInterests ();
-
- void periodicCheck ();
+ void
+ expireInterests ();
private:
typedef boost::unordered_map<std::string, time_t> TableContainer;
static const int m_checkPeriod = 4;
TableContainer m_table; // pit entries
-
- boost::thread m_thread; // thread to check every 4 sec
- volatile bool m_running;
- boost::recursive_mutex m_mutex;
+ Scheduler m_scheduler;
+ mutable boost::recursive_mutex m_mutex;
};
} // Sync
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 2134256..7f052f1 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -34,7 +34,7 @@
using namespace std;
using namespace boost;
-INIT_LOGGER ("SyncLogic");
+// INIT_LOGGER ("SyncLogic");
namespace Sync
{
@@ -47,9 +47,16 @@
, m_onRemove (onRemove)
, m_ccnxHandle(new CcnxWrapper())
, m_randomGenerator (static_cast<unsigned int> (std::time (0)))
- , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (10,50))
+ , m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,80))
{
- _LOG_FUNCTION (syncPrefix);
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+ // _LOG_FUNCTION (syncPrefix);
+ static int id = 0;
+ staticModuleLogger = log4cxx::Logger::getLogger ("SyncLogic." + lexical_cast<string> (id));
+ id ++;
+#endif
+#endif
m_ccnxHandle->setInterestFilter (m_syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
@@ -61,7 +68,7 @@
SyncLogic::~SyncLogic ()
{
- _LOG_FUNCTION (this);
+ // _LOG_FUNCTION (this);
// cout << "SyncLogic::~SyncLogic ()" << endl;
m_ccnxHandle.reset ();
@@ -100,9 +107,16 @@
{
_LOG_TRACE (">> D " << interestName << "/state" << " (zero)");
+ m_syncInterestTable.remove (interestName + "/state");
m_ccnxHandle->publishData (interestName + "/state",
lexical_cast<string> (m_state),
m_syncResponseFreshness);
+ if (m_outstandingInterest == interestName)
+ {
+ m_scheduler.schedule (posix_time::seconds (0),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
+ }
return;
}
@@ -111,12 +125,12 @@
// cout << interestName << "\n";
if (digest->isZero ())
{
- _LOG_TRACE ("Digest is zero, adding /state to PIT");
+ _LOG_TRACE ("processSyncInterest (): Digest is zero, adding /state to PIT");
m_syncInterestTable.insert (interestName + "/state");
}
else
{
- _LOG_TRACE ("Same state. Adding to PIT");
+ _LOG_TRACE ("processSyncInterest (): Same state. Adding to PIT");
m_syncInterestTable.insert (interestName);
}
return;
@@ -127,16 +141,22 @@
if (stateInDiffLog != m_log.end ())
{
_LOG_TRACE (">> D " << interestName);
-
+
+ m_syncInterestTable.remove (interestName);
m_ccnxHandle->publishData (interestName,
lexical_cast<string> (*(*stateInDiffLog)->diff ()),
m_syncResponseFreshness);
+ if (m_outstandingInterest == interestName)
+ {
+ m_scheduler.schedule (posix_time::seconds (0),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
+ }
return;
}
if (!timedProcessing)
{
- _LOG_DEBUG ("hmm");
m_scheduler.schedule (posix_time::milliseconds (m_rangeUniformRandom ()) /*from 20 to 100ms*/,
bind (&SyncLogic::processSyncInterest, this, digest, interestName, true),
DELAYED_INTEREST_PROCESSING);
@@ -145,10 +165,18 @@
else
{
_LOG_TRACE (">> D " << interestName << "/state" << " (timed processing)");
-
+
+ m_syncInterestTable.remove (interestName + "/state");
m_ccnxHandle->publishData (interestName + "/state",
lexical_cast<string> (m_state),
m_syncResponseFreshness);
+
+ if (m_outstandingInterest == interestName)
+ {
+ m_scheduler.schedule (posix_time::seconds (0),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
+ }
}
}
@@ -244,14 +272,17 @@
// if state has changed, then it is safe to express a new interest
if (diffLog->getLeaves ().size () > 0)
{
- sendSyncInterest ();
+ m_scheduler.schedule (posix_time::seconds (0),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
else
{
// should not reexpress the same interest. Need at least wait for data lifetime
// Otherwise we will get immediate reply from the local daemon and there will be 100% utilization
m_scheduler.cancel (REEXPRESSING_INTEREST);
- m_scheduler.schedule (posix_time::seconds (m_syncResponseFreshness),
+ // m_scheduler.schedule (posix_time::seconds (0),
+ m_scheduler.schedule (posix_time::seconds (m_syncResponseFreshness) + posix_time::milliseconds (1),
bind (&SyncLogic::sendSyncInterest, this),
REEXPRESSING_INTEREST);
}
@@ -265,10 +296,27 @@
{
stringstream ss;
ss << *diffLog;
+ bool satisfiedOwnInterest = false;
+
for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
{
_LOG_TRACE (">> D " << *ii);
m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+
+ {
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+ // _LOG_DEBUG (*ii << " == " << m_outstandingInterest << " = " << (*ii == m_outstandingInterest));
+ satisfiedOwnInterest = satisfiedOwnInterest || (*ii == m_outstandingInterest) || (*ii == (m_outstandingInterest + "/state"));
+ }
+ }
+
+ if (satisfiedOwnInterest)
+ {
+ _LOG_DEBUG ("Have satisfied our own interest. Scheduling interest reexpression");
+ // we need to reexpress interest only if we satisfied our own interest
+ m_scheduler.schedule (posix_time::milliseconds (0),
+ bind (&SyncLogic::sendSyncInterest, this),
+ REEXPRESSING_INTEREST);
}
}
}
@@ -285,7 +333,7 @@
m_log.erase (m_state.getDigest()); // remove diff state with the same digest. next pointers are still valid
/// @todo Optimization
m_log.get<sequenced> ().push_front (diffLog);
- _LOG_DEBUG (*diffLog->getDigest () << " " << m_log.size ());
+ // _LOG_DEBUG (*diffLog->getDigest () << " " << m_log.size ());
}
void
@@ -296,16 +344,19 @@
//cout << "Add local names" <<endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
-
+
SeqNo seqN (session, seq);
m_state.update(info, seqN);
+ _LOG_DEBUG ("addLocalNames (): new state " << *m_state.getDigest ());
+
diff = make_shared<DiffState>();
diff->update(info, seqN);
processPendingSyncInterests (diff);
}
- satisfyPendingSyncInterests (diff);
+ // _LOG_DEBUG ("PIT size: " << m_syncInterestTable.size ());
+ satisfyPendingSyncInterests (diff);
}
void
@@ -323,20 +374,25 @@
processPendingSyncInterests (diff);
}
- satisfyPendingSyncInterests (diff);
+ satisfyPendingSyncInterests (diff);
}
void
SyncLogic::sendSyncInterest ()
{
- // cout << "Sending Sync Interest" << endl;
- recursive_mutex::scoped_lock lock (m_stateMutex);
-
ostringstream os;
- os << m_syncPrefix << "/" << *m_state.getDigest();
- _LOG_TRACE (">> I " << os.str ());
+ {
+ // cout << "Sending Sync Interest" << endl;
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+ os << m_syncPrefix << "/" << *m_state.getDigest();
+
+ _LOG_TRACE (">> I " << os.str ());
+
+ m_outstandingInterest = os.str ();
+ }
+
m_ccnxHandle->sendInterest (os.str (),
bind (&SyncLogic::processSyncData, this, _1, _2));
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 871ee33..e966dea 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -37,6 +37,12 @@
#include "sync-diff-state-container.h"
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+#include <log4cxx/logger.h>
+#endif
+#endif
+
namespace Sync {
/**
@@ -114,6 +120,7 @@
DiffStateContainer m_log;
boost::recursive_mutex m_stateMutex;
+ std::string m_outstandingInterest;
SyncInterestTable m_syncInterestTable;
std::string m_syncPrefix;
@@ -133,6 +140,12 @@
DELAYED_INTEREST_PROCESSING = 1,
REEXPRESSING_INTEREST = 2
};
+
+#ifdef _DEBUG
+#ifdef HAVE_LOG4CXX
+ log4cxx::LoggerPtr staticModuleLogger;
+#endif
+#endif
};
diff --git a/model/sync-scheduler.cc b/model/sync-scheduler.cc
index f54f91f..73a904a 100644
--- a/model/sync-scheduler.cc
+++ b/model/sync-scheduler.cc
@@ -66,13 +66,13 @@
nextTime = m_events.begin ()->time;
}
- if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
+ if (nextTime > get_system_time ())
{
this_thread::sleep (nextTime - get_system_time ());
// sleeping
- if (nextTime - get_system_time () > posix_time::time_duration (0,0,0,0))
+ if (nextTime > get_system_time ())
{
// cout << "expected here" << endl;
continue; // something changes, try again
diff --git a/test/test_app_socket.cc b/test/test_app_socket.cc
index bf7db83..d1b90c0 100644
--- a/test/test_app_socket.cc
+++ b/test/test_app_socket.cc
@@ -88,7 +88,7 @@
string data0 = "Very funny Scotty, now beam down my clothes";
_LOG_DEBUG ("s1 publish");
s1.publish (p1, 0, data0, 10);
- this_thread::sleep (posix_time::milliseconds (50));
+ this_thread::sleep (posix_time::milliseconds (250));
// from code logic, we won't be fetching our own data
a1.set(p1 + "/0/0", data0);
@@ -103,9 +103,8 @@
s1.publish (p1, 0, data1, 10);
_LOG_DEBUG ("s1 publish");
s1.publish (p1, 0, data2, 10);
- this_thread::sleep (posix_time::milliseconds (100));
+ this_thread::sleep (posix_time::milliseconds (250));
- _LOG_DEBUG ("testing");
// // // from code logic, we won't be fetching our own data
a1.set(p1 + "/0/1", data1);
a1.set(p1 + "/0/2", data2);
@@ -113,36 +112,35 @@
BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
// // another single source
- // // string data3 = "You surf the Internet, I surf the real world";
- // string data4 = "I got a fortune cookie once that said 'You like Chinese food'";
- // string data5 = "Real men wear pink. Why? Because their wives make them";
- // // s3.publish(p3, 0, data3, 10);
- // // this_thread::sleep (posix_time::milliseconds (1000));
+ string data3 = "You surf the Internet, I surf the real world";
+ string data4 = "I got a fortune cookie once that said 'You like Chinese food'";
+ string data5 = "Real men wear pink. Why? Because their wives make them";
+ _LOG_DEBUG ("s3 publish");
+ s3.publish(p3, 0, data3, 10);
+ this_thread::sleep (posix_time::milliseconds (200));
- // // another single source, multiple data at once
- // s2.publish(p2, 0, data4, 10);
- // s2.publish(p2, 0, data5, 10);
- // this_thread::sleep (posix_time::milliseconds (1000));
+ // another single source, multiple data at once
+ s2.publish(p2, 0, data4, 10);
+ s2.publish(p2, 0, data5, 10);
+ this_thread::sleep (posix_time::milliseconds (200));
- // // from code logic, we won't be fetching our own data
- // // a3.set(p3 + "/0/0", data3);
- // a2.set(p2 + "/0/0", data4);
- // a2.set(p2 + "/0/1", data5);
- // BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
- // // BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+ // from code logic, we won't be fetching our own data
+ a3.set(p3 + "/0/0", data3);
+ a2.set(p2 + "/0/0", data4);
+ a2.set(p2 + "/0/1", data5);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
- // // not sure weither this is simultanous data generation from multiple sources
- // string data6 = "Shakespeare says: 'Prose before hos.'";
- // string data7 = "Pick good people, talent never wears out";
- // s1.publish(p1, 0, data6, 10);
- // s2.publish(p2, 0, data7, 10);
- // usleep(10000);
+ // not sure weither this is simultanous data generation from multiple sources
+ string data6 = "Shakespeare says: 'Prose before hos.'";
+ string data7 = "Pick good people, talent never wears out";
+ s1.publish(p1, 0, data6, 10);
+ s2.publish(p2, 0, data7, 10);
+ this_thread::sleep (posix_time::milliseconds (10000));
- // // from code logic, we won't be fetching our own data
- // a1.set(p1 + "/0/3", data6);
- // a2.set(p2 + "/0/2", data7);
- // BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
- // BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+ // from code logic, we won't be fetching our own data
+ a1.set(p1 + "/0/3", data6);
+ a2.set(p2 + "/0/2", data7);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
}
-
-