Add tons of new loggings
diff --git a/log4cxx.properties b/log4cxx.properties
index cd28cd6..a3663c4 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -11,6 +11,11 @@
#log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS} %-5p %-12c %m%n
+log4j.logger.Executor = ERROR
+log4j.logger.Sync.Log = ERROR
+log4j.logger.Sync.Core = DEBUG
+log4j.logger.Scheduler = ERROR
+
#log4j.logger.Sync = DEBUG
#log4j.logger.Sync.Log = ERROR
#log4j.logger.SyncInterestTable = TRACE
diff --git a/scheduler/random-interval-generator.h b/scheduler/random-interval-generator.h
index bf30156..bd46214 100644
--- a/scheduler/random-interval-generator.h
+++ b/scheduler/random-interval-generator.h
@@ -27,6 +27,7 @@
#include <boost/random/mersenne_twister.hpp>
#include <boost/random/uniform_real.hpp>
#include <boost/random/variate_generator.hpp>
+#include <boost/date_time/posix_time/posix_time_types.hpp>
// generates intervals with uniform distribution
class RandomIntervalGenerator : public IntervalGenerator
@@ -45,7 +46,8 @@
// direction shifts the random range; e.g. in the above example, UP would produce a range of
// 10 ~ 12, DOWN of 8 ~ 10, and EVEN of 9 ~ 11
RandomIntervalGenerator(double interval, double percent, Direction direction = EVEN)
- : m_rng(time(NULL))
+ // : m_rng(time(NULL))
+ : m_rng (static_cast<int> (boost::posix_time::microsec_clock::local_time().time_of_day ().total_nanoseconds ()))
, m_dist(0.0, fractional(percent))
, m_random(m_rng, m_dist)
, m_direction(direction)
@@ -54,7 +56,7 @@
{ }
virtual ~RandomIntervalGenerator(){}
-
+
virtual double
nextInterval() _OVERRIDE
{
@@ -70,7 +72,7 @@
return interval;
}
-
+
private:
inline double fractional(double x) { double dummy; return abs(modf(x, &dummy)); }
diff --git a/scheduler/scheduler.cc b/scheduler/scheduler.cc
index ee6a124..236d5e8 100644
--- a/scheduler/scheduler.cc
+++ b/scheduler/scheduler.cc
@@ -22,10 +22,13 @@
#include "scheduler.h"
#include "one-time-task.h"
#include "periodic-task.h"
+#include "logging.h"
#include <utility>
#include <boost/make_shared.hpp>
+INIT_LOGGER ("Scheduler");
+
using namespace std;
using namespace boost;
@@ -36,7 +39,7 @@
void errorCallback(int err)
{
- cout << "Fatal error: " << err << endl;
+ _LOG_ERROR ("Fatal error: " << err);
}
Scheduler::Scheduler()
@@ -49,6 +52,7 @@
Scheduler::~Scheduler()
{
+ shutdown ();
event_base_free(m_base);
}
@@ -59,14 +63,14 @@
{
if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
{
- cout << "scheduler loop break error" << endl;
+ _LOG_DEBUG ("scheduler loop break error");
}
-
+
{
- ReadLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
if (!m_running)
{
- cout << "scheduler loop break normal" << endl;
+ _LOG_DEBUG ("scheduler loop break normal");
break;
}
}
@@ -76,7 +80,7 @@
void
Scheduler::start()
{
- WriteLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
if (!m_running)
{
m_thread = boost::thread(&Scheduler::eventLoop, this);
@@ -87,13 +91,21 @@
void
Scheduler::shutdown()
{
+ bool breakAndWait = false;
{
- WriteLock lock(m_mutex);
- m_running = false;
+ ScopedLock lock (m_mutex);
+ if (m_running)
+ {
+ m_running = false;
+ breakAndWait = true;
+ }
}
-
- event_base_loopbreak(m_base);
- m_thread.join();
+
+ if (breakAndWait)
+ {
+ event_base_loopbreak(m_base);
+ m_thread.join();
+ }
}
TaskPtr
@@ -128,13 +140,13 @@
int res = evtimer_add(newTask->ev(), newTask->tv());
if (res < 0)
{
- cout << "evtimer_add failed for " << newTask->tag() << endl;
+ _LOG_ERROR ("evtimer_add failed for " << newTask->tag());
}
return true;
}
else
{
- cout << "fail to add task: " << newTask->tag() << endl;
+ _LOG_ERROR ("fail to add task: " << newTask->tag());
}
return false;
@@ -149,7 +161,7 @@
void
Scheduler::rescheduleTask(const TaskPtr &task)
{
- ReadLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
TaskMapIt it = m_taskMap.find(task->tag());
if (it != m_taskMap.end())
{
@@ -158,7 +170,7 @@
int res = evtimer_add(task->ev(), task->tv());
if (res < 0)
{
- cout << "evtimer_add failed for " << task->tag() << endl;
+ _LOG_ERROR ("evtimer_add failed for " << task->tag());
}
}
else
@@ -170,7 +182,7 @@
void
Scheduler::rescheduleTask(const Task::Tag &tag)
{
- ReadLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
TaskMapIt it = m_taskMap.find(tag);
if (it != m_taskMap.end())
{
@@ -187,7 +199,7 @@
bool
Scheduler::addToMap(const TaskPtr &task)
{
- WriteLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
if (m_taskMap.find(task->tag()) == m_taskMap.end())
{
m_taskMap.insert(make_pair(task->tag(), task));
@@ -199,7 +211,7 @@
void
Scheduler::deleteTask(const Task::Tag &tag)
{
- WriteLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
TaskMapIt it = m_taskMap.find(tag);
if (it != m_taskMap.end())
{
@@ -212,7 +224,7 @@
void
Scheduler::deleteTask(const Task::TaskMatcher &matcher)
{
- WriteLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
TaskMapIt it = m_taskMap.begin();
while(it != m_taskMap.end())
{
@@ -235,6 +247,6 @@
int
Scheduler::size()
{
- ReadLock lock(m_mutex);
+ ScopedLock lock(m_mutex);
return m_taskMap.size();
}
diff --git a/scheduler/scheduler.h b/scheduler/scheduler.h
index e783cf0..6ba826a 100644
--- a/scheduler/scheduler.h
+++ b/scheduler/scheduler.h
@@ -68,7 +68,7 @@
static TaskPtr
schedulePeriodicTask (SchedulerPtr scheduler, IntervalGeneratorPtr delayGenerator,
const Task::Callback &callback, const Task::Tag &tag);
-
+
// if task with the same tag exists, the task is not added and return false
virtual bool
addTask(TaskPtr task);
@@ -76,7 +76,7 @@
// delete task by task->tag, regardless of whether it's invoked or not
virtual void
deleteTask(TaskPtr task);
-
+
// delete task by tag, regardless of whether it's invoked or not
// if no task is found, no effect
virtual void
@@ -121,11 +121,11 @@
typedef std::map<Task::Tag, TaskPtr> TaskMap;
typedef std::map<Task::Tag, TaskPtr>::iterator TaskMapIt;
typedef boost::shared_mutex Mutex;
- typedef boost::unique_lock<Mutex> WriteLock;
- typedef boost::shared_lock<Mutex> ReadLock;
+ typedef boost::unique_lock<Mutex> ScopedLock;
+
TaskMap m_taskMap;
Mutex m_mutex;
- bool m_running;
+ volatile bool m_running;
event_base *m_base;
boost::thread m_thread;
};
diff --git a/src/action-log.cc b/src/action-log.cc
index 9b2ee9c..b589464 100644
--- a/src/action-log.cc
+++ b/src/action-log.cc
@@ -256,7 +256,7 @@
Bytes actionData = m_ccnx->createContentObject (actionName, item_msg.c_str (), item_msg.size ());
CcnxCharbufPtr namePtr = actionName.toCcnxCharbuf ();
- _LOG_DEBUG (" >>>>>>> " << namePtr->buf () << " " << namePtr->length ());
+ // _LOG_DEBUG (" >>>>>>> " << Name (namePtr->buf () << " " << namePtr->length ());
sqlite3_bind_blob (stmt, 15, namePtr->buf (), namePtr->length (), SQLITE_TRANSIENT);
sqlite3_bind_blob (stmt, 16, head (actionData), actionData.size (), SQLITE_TRANSIENT);
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 8c93d96..ad88aed 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -72,10 +72,17 @@
{
Ccnx::CcnxDiscovery::registerCallback (TaggedFunction (bind (&Dispatcher::Did_LocalPrefix_Updated, this, _1), "dispatcher"));
}
+
+ m_executor.start ();
}
Dispatcher::~Dispatcher()
{
+ // _LOG_DEBUG ("Enter destructor of dispatcher");
+ m_executor.shutdown ();
+
+ // _LOG_DEBUG (">>");
+
if (m_enablePrefixDiscovery)
{
Ccnx::CcnxDiscovery::deregisterCallback (TaggedFunction (bind (&Dispatcher::Did_LocalPrefix_Updated, this, _1), "dispatcher"));
@@ -198,14 +205,20 @@
*/
void
-Dispatcher::Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg)
+Dispatcher::Did_SyncLog_StateChange (SyncStateMsgPtr stateMsg)
+{
+ m_executor.execute (bind (&Dispatcher::Did_SyncLog_StateChange_Execute, this, stateMsg));
+}
+
+void
+Dispatcher::Did_SyncLog_StateChange_Execute (SyncStateMsgPtr stateMsg)
{
int size = stateMsg->state_size();
int index = 0;
// iterate and fetch the actions
while (index < size)
{
- SyncState state = stateMsg->state(index);
+ SyncState state = stateMsg->state (index);
if (state.has_old_seq() && state.has_seq())
{
uint64_t oldSeq = state.old_seq();
@@ -222,6 +235,7 @@
}
}
+
void
Dispatcher::Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco)
{
diff --git a/src/dispatcher.h b/src/dispatcher.h
index 1bea184..540bca1 100644
--- a/src/dispatcher.h
+++ b/src/dispatcher.h
@@ -96,7 +96,10 @@
// callback to process remote sync state change
void
- Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg);
+ Did_SyncLog_StateChange (SyncStateMsgPtr stateMsg);
+
+ void
+ Did_SyncLog_StateChange_Execute (SyncStateMsgPtr stateMsg);
void
Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco);
diff --git a/src/executor.cc b/src/executor.cc
index 67dd74a..15fbb5b 100644
--- a/src/executor.cc
+++ b/src/executor.cc
@@ -28,24 +28,46 @@
using namespace boost;
Executor::Executor (int poolSize)
- : m_needStop (false)
+ : m_needStop (true)
+ , m_poolSize (poolSize)
{
- for (int i = 0; i < poolSize; i++)
- {
- m_group.create_thread (bind(&Executor::run, this));
- }
}
Executor::~Executor()
{
_LOG_DEBUG ("Enter destructor");
- m_needStop = true;
- m_group.interrupt_all ();
- m_group.join_all ();
+ shutdown ();
_LOG_DEBUG ("Exit destructor");
}
void
+Executor::start ()
+{
+ if (m_needStop)
+ {
+ m_needStop = false;
+ for (int i = 0; i < m_poolSize; i++)
+ {
+ m_group.create_thread (bind(&Executor::run, this));
+ }
+ }
+}
+
+void
+Executor::shutdown ()
+{
+ if (!m_needStop)
+ {
+ m_needStop = true;
+ _LOG_DEBUG ("Iterrupting all");
+ m_group.interrupt_all ();
+ _LOG_DEBUG ("Join all");
+ m_group.join_all ();
+ }
+}
+
+
+void
Executor::execute(const Job &job)
{
_LOG_DEBUG ("Add to job queue");
@@ -83,10 +105,12 @@
{
Job job = waitForJob();
+ _LOG_DEBUG (">>> enter job");
job (); // even if job is "null", nothing bad will happen
+ _LOG_DEBUG ("<<< exit job");
}
- _LOG_DEBUG ("Thread finished");
+ _LOG_DEBUG ("Executor thread finished");
}
Executor::Job
@@ -97,7 +121,9 @@
// wait until job queue is not empty
while (m_queue.empty())
{
+ _LOG_DEBUG ("Unlocking mutex for wait");
m_cond.wait(lock);
+ _LOG_DEBUG ("Re-locking mutex after wait");
}
_LOG_DEBUG ("Got signal on condition");
diff --git a/src/executor.h b/src/executor.h
index e15c174..8d3c021 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -55,6 +55,12 @@
int
jobQueueSize();
+ void
+ start ();
+
+ void
+ shutdown ();
+
private:
void
run();
@@ -75,5 +81,6 @@
ThreadGroup m_group;
volatile bool m_needStop;
+ int m_poolSize;
};
#endif // EXECUTOR_H
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 1cffa49..bbdd6a8 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -23,6 +23,9 @@
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
#include <boost/throw_exception.hpp>
+#include "logging.h"
+
+INIT_LOGGER ("Fetch.Manager");
using namespace boost;
using namespace std;
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 5f731bb..f577b14 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -94,9 +94,7 @@
if (m_outOfOrderRecvSeqNo.find (m_minSendSeqNo+1) != m_outOfOrderRecvSeqNo.end ())
continue;
- Name x = Name(m_forwardingHint)(m_name)(m_minSendSeqNo+1);
- int xsize = x.size();
- //_LOG_DEBUG (" >>> i " << x.getPartialName(0, xsize - 2) << ", seq = " << (m_minSendSeqNo + 1 ));
+ _LOG_DEBUG (" >>> i " << Name (m_forwardingHint)(m_name) << ", seq = " << (m_minSendSeqNo + 1 ));
// cout << ">>> " << m_minSendSeqNo+1 << endl;
m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
@@ -111,6 +109,8 @@
void
Fetcher::OnData (uint64_t seqno, const Ccnx::Name &name, PcoPtr data)
{
+ _LOG_DEBUG (" <<< d " << name.getPartialName (0, name.size () - 1) << ", seq = " << seqno);
+
if (m_forwardingHint == Name ())
{
// invoke callback
diff --git a/src/sync-core.cc b/src/sync-core.cc
index 80daac8..95a5d62 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -56,7 +56,7 @@
SyncCore::~SyncCore()
{
- m_scheduler->shutdown ();
+ // m_scheduler->shutdown ();
// need to "deregister" closures
}
@@ -86,9 +86,9 @@
BytesPtr syncData = serializeMsg (*msg);
m_ccnx->publishData(syncName, *syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " localStateChanged ");
- _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *oldHash);
- _LOG_TRACE (msg);
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] localStateChanged ");
+ _LOG_TRACE ("[" << m_log->GetLocalName () << "] publishes: " << *oldHash);
+ // _LOG_TRACE (msg);
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
@@ -119,6 +119,8 @@
void
SyncCore::handleRecoverInterest(const Name &name)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< RECOVER Interest with name " << name);
+
Bytes hashBytes = name.getComp(name.size() - 1);
// this is the hash unkonwn to the sender of the interest
Hash hash(head(hashBytes), hashBytes.size());
@@ -129,18 +131,20 @@
BytesPtr syncData = serializeMsg (*msg);
m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE (m_log->GetLocalName () << " publishes " << hash);
- _LOG_TRACE (msg);
- }
- else
- {
- // we don't recognize this hash, can not help
+ _LOG_TRACE ("[" << m_log->GetLocalName () << "] publishes " << hash);
+ // _LOG_TRACE (msg);
}
+ else
+ {
+ // we don't recognize this hash, can not help
+ }
}
void
SyncCore::handleSyncInterest(const Name &name)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< SYNC Interest with name " << name);
+
Bytes hashBytes = name.getComp(name.size() - 1);
HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
if (*hash == *m_rootHash)
@@ -194,6 +198,7 @@
void
SyncCore::handleRecoverData(const Name &name, PcoPtr content)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< RECOVER DATA with name: " << name);
//cout << "handle recover data" << end;
handleStateData(*content->contentPtr ());
sendSyncInterest();
@@ -202,6 +207,8 @@
void
SyncCore::handleSyncData(const Name &name, PcoPtr content)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< SYNC DATA with name: " << name);
+
// suppress recover in interest - data out of order case
handleStateData(*content->contentPtr ());
@@ -262,7 +269,7 @@
if (diff->state_size() > 0)
{
- m_stateMsgCallback(diff);
+ m_stateMsgCallback (diff);
}
}
@@ -271,11 +278,11 @@
{
Name syncInterest = Name (m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] >>> SYNC Interest for " << *m_rootHash);
+
m_ccnx->sendInterest(syncInterest,
Closure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1)));
-
- _LOG_TRACE (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
}
void
@@ -291,11 +298,12 @@
// append the unknown hash
Name recoverInterest = Name (m_syncPrefix)(RECOVER)(bytes);
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] >>> RECOVER Interests for " << *hash);
+
m_ccnx->sendInterest(recoverInterest,
Closure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1)));
- _LOG_TRACE (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
}
else
{
diff --git a/src/sync-core.h b/src/sync-core.h
index 48332da..db20bbb 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -31,7 +31,7 @@
class SyncCore
{
public:
- typedef boost::function<void (const SyncStateMsgPtr & stateMsg) > StateMsgCallback;
+ typedef boost::function<void (SyncStateMsgPtr stateMsg) > StateMsgCallback;
static const int FRESHNESS = 2; // seconds
static const string RECOVER;
diff --git a/test/test-dispatcher.cc b/test/test-dispatcher.cc
index 6378cb2..e3918f8 100644
--- a/test/test-dispatcher.cc
+++ b/test/test-dispatcher.cc
@@ -20,6 +20,7 @@
*/
#include "ccnx-wrapper.h"
+#include "logging.h"
#include "dispatcher.h"
#include <boost/test/unit_test.hpp>
#include <boost/make_shared.hpp>
@@ -32,6 +33,8 @@
using namespace boost;
namespace fs = boost::filesystem;
+INIT_LOGGER ("Test.Dispatcher");
+
BOOST_AUTO_TEST_SUITE(DispatcherTest)
@@ -50,35 +53,35 @@
BOOST_AUTO_TEST_CASE(TestDispatcher)
{
- fs::path dir1("test-white-house");
- fs::path dir2("test-black-house");
+ INIT_LOGGERS ();
- string user1 = "/obama";
+ fs::path dir1("./TestDispatcher/test-white-house");
+ fs::path dir2("./TestDispatcher/test-black-house");
+
+ string user1 = "/obamaa";
string user2 = "/romney";
string folder = "who-is-president";
CcnxWrapperPtr ccnx1 = make_shared<CcnxWrapper>();
- usleep(1000);
+ usleep(100);
CcnxWrapperPtr ccnx2 = make_shared<CcnxWrapper>();
- usleep(1000);
+ usleep(100);
cleanDir(dir1);
cleanDir(dir2);
- fs::create_directory(dir1);
- fs::create_directory(dir2);
-
Dispatcher d1(user1, folder, dir1, ccnx1, 2, false);
- usleep(1000);
+ usleep(100);
Dispatcher d2(user2, folder, dir2, ccnx2, 2, false);
- sleep(1);
+ usleep(1000000);
+ _LOG_DEBUG ("checking obama vs romney");
checkRoots(d1.SyncRoot(), d2.SyncRoot());
fs::path filename("a_letter_to_romney.txt");
- string words = "I'm the new socialist President. You are not.";
+ string words = "I'm the new socialist President. You are not!";
fs::path abf = dir1 / filename;
@@ -98,8 +101,8 @@
HashPtr fileHash2 = Hash::FromFileContent(ef);
BOOST_CHECK_EQUAL(*fileHash1, *fileHash2);
- cleanDir(dir1);
- cleanDir(dir2);
+ // cleanDir(dir1);
+ // cleanDir(dir2);
}
BOOST_AUTO_TEST_SUITE_END()
diff --git a/test/test-sync-core.cc b/test/test-sync-core.cc
index d18d4d4..f38dab7 100644
--- a/test/test-sync-core.cc
+++ b/test/test-sync-core.cc
@@ -10,7 +10,7 @@
using namespace boost;
using namespace boost::filesystem;
-INIT_LOGGER("Test.Sync.Core");
+INIT_LOGGER("Test.SyncCore");
BOOST_AUTO_TEST_SUITE(SyncCoreTests)
@@ -64,7 +64,8 @@
SyncCore *core1 = new SyncCore(log1, user1, loc1, syncPrefix, bind(callback, _1), c1);
usleep(10000);
SyncCore *core2 = new SyncCore(log2, user2, loc2, syncPrefix, bind(callback, _1), c2);
- usleep(1000000);
+
+ sleep(1);
checkRoots(core1->root(), core2->root());
// _LOG_TRACE ("\n\n\n\n\n\n----------\n");
diff --git a/wscript b/wscript
index b7d5d1a..17e1e07 100644
--- a/wscript
+++ b/wscript
@@ -73,7 +73,7 @@
features = ["cxx"],
source = bld.path.ant_glob(['scheduler/**/*.cc']),
use = 'BOOST BOOST_THREAD LIBEVENT LIBEVENT_PTHREADS LOG4CXX',
- includes = "scheduler",
+ includes = "scheduler src",
)
libccnx = bld (