Add tons of new loggings
diff --git a/src/action-log.cc b/src/action-log.cc
index 9b2ee9c..b589464 100644
--- a/src/action-log.cc
+++ b/src/action-log.cc
@@ -256,7 +256,7 @@
Bytes actionData = m_ccnx->createContentObject (actionName, item_msg.c_str (), item_msg.size ());
CcnxCharbufPtr namePtr = actionName.toCcnxCharbuf ();
- _LOG_DEBUG (" >>>>>>> " << namePtr->buf () << " " << namePtr->length ());
+ // _LOG_DEBUG (" >>>>>>> " << Name (namePtr->buf () << " " << namePtr->length ());
sqlite3_bind_blob (stmt, 15, namePtr->buf (), namePtr->length (), SQLITE_TRANSIENT);
sqlite3_bind_blob (stmt, 16, head (actionData), actionData.size (), SQLITE_TRANSIENT);
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 8c93d96..ad88aed 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -72,10 +72,17 @@
{
Ccnx::CcnxDiscovery::registerCallback (TaggedFunction (bind (&Dispatcher::Did_LocalPrefix_Updated, this, _1), "dispatcher"));
}
+
+ m_executor.start ();
}
Dispatcher::~Dispatcher()
{
+ // _LOG_DEBUG ("Enter destructor of dispatcher");
+ m_executor.shutdown ();
+
+ // _LOG_DEBUG (">>");
+
if (m_enablePrefixDiscovery)
{
Ccnx::CcnxDiscovery::deregisterCallback (TaggedFunction (bind (&Dispatcher::Did_LocalPrefix_Updated, this, _1), "dispatcher"));
@@ -198,14 +205,20 @@
*/
void
-Dispatcher::Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg)
+Dispatcher::Did_SyncLog_StateChange (SyncStateMsgPtr stateMsg)
+{
+ m_executor.execute (bind (&Dispatcher::Did_SyncLog_StateChange_Execute, this, stateMsg));
+}
+
+void
+Dispatcher::Did_SyncLog_StateChange_Execute (SyncStateMsgPtr stateMsg)
{
int size = stateMsg->state_size();
int index = 0;
// iterate and fetch the actions
while (index < size)
{
- SyncState state = stateMsg->state(index);
+ SyncState state = stateMsg->state (index);
if (state.has_old_seq() && state.has_seq())
{
uint64_t oldSeq = state.old_seq();
@@ -222,6 +235,7 @@
}
}
+
void
Dispatcher::Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco)
{
diff --git a/src/dispatcher.h b/src/dispatcher.h
index 1bea184..540bca1 100644
--- a/src/dispatcher.h
+++ b/src/dispatcher.h
@@ -96,7 +96,10 @@
// callback to process remote sync state change
void
- Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg);
+ Did_SyncLog_StateChange (SyncStateMsgPtr stateMsg);
+
+ void
+ Did_SyncLog_StateChange_Execute (SyncStateMsgPtr stateMsg);
void
Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco);
diff --git a/src/executor.cc b/src/executor.cc
index 67dd74a..15fbb5b 100644
--- a/src/executor.cc
+++ b/src/executor.cc
@@ -28,24 +28,46 @@
using namespace boost;
Executor::Executor (int poolSize)
- : m_needStop (false)
+ : m_needStop (true)
+ , m_poolSize (poolSize)
{
- for (int i = 0; i < poolSize; i++)
- {
- m_group.create_thread (bind(&Executor::run, this));
- }
}
Executor::~Executor()
{
_LOG_DEBUG ("Enter destructor");
- m_needStop = true;
- m_group.interrupt_all ();
- m_group.join_all ();
+ shutdown ();
_LOG_DEBUG ("Exit destructor");
}
void
+Executor::start ()
+{
+ if (m_needStop)
+ {
+ m_needStop = false;
+ for (int i = 0; i < m_poolSize; i++)
+ {
+ m_group.create_thread (bind(&Executor::run, this));
+ }
+ }
+}
+
+void
+Executor::shutdown ()
+{
+ if (!m_needStop)
+ {
+ m_needStop = true;
+ _LOG_DEBUG ("Iterrupting all");
+ m_group.interrupt_all ();
+ _LOG_DEBUG ("Join all");
+ m_group.join_all ();
+ }
+}
+
+
+void
Executor::execute(const Job &job)
{
_LOG_DEBUG ("Add to job queue");
@@ -83,10 +105,12 @@
{
Job job = waitForJob();
+ _LOG_DEBUG (">>> enter job");
job (); // even if job is "null", nothing bad will happen
+ _LOG_DEBUG ("<<< exit job");
}
- _LOG_DEBUG ("Thread finished");
+ _LOG_DEBUG ("Executor thread finished");
}
Executor::Job
@@ -97,7 +121,9 @@
// wait until job queue is not empty
while (m_queue.empty())
{
+ _LOG_DEBUG ("Unlocking mutex for wait");
m_cond.wait(lock);
+ _LOG_DEBUG ("Re-locking mutex after wait");
}
_LOG_DEBUG ("Got signal on condition");
diff --git a/src/executor.h b/src/executor.h
index e15c174..8d3c021 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -55,6 +55,12 @@
int
jobQueueSize();
+ void
+ start ();
+
+ void
+ shutdown ();
+
private:
void
run();
@@ -75,5 +81,6 @@
ThreadGroup m_group;
volatile bool m_needStop;
+ int m_poolSize;
};
#endif // EXECUTOR_H
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 1cffa49..bbdd6a8 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -23,6 +23,9 @@
#include <boost/make_shared.hpp>
#include <boost/ref.hpp>
#include <boost/throw_exception.hpp>
+#include "logging.h"
+
+INIT_LOGGER ("Fetch.Manager");
using namespace boost;
using namespace std;
diff --git a/src/fetcher.cc b/src/fetcher.cc
index 5f731bb..f577b14 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -94,9 +94,7 @@
if (m_outOfOrderRecvSeqNo.find (m_minSendSeqNo+1) != m_outOfOrderRecvSeqNo.end ())
continue;
- Name x = Name(m_forwardingHint)(m_name)(m_minSendSeqNo+1);
- int xsize = x.size();
- //_LOG_DEBUG (" >>> i " << x.getPartialName(0, xsize - 2) << ", seq = " << (m_minSendSeqNo + 1 ));
+ _LOG_DEBUG (" >>> i " << Name (m_forwardingHint)(m_name) << ", seq = " << (m_minSendSeqNo + 1 ));
// cout << ">>> " << m_minSendSeqNo+1 << endl;
m_ccnx->sendInterest (Name (m_forwardingHint)(m_name)(m_minSendSeqNo+1),
@@ -111,6 +109,8 @@
void
Fetcher::OnData (uint64_t seqno, const Ccnx::Name &name, PcoPtr data)
{
+ _LOG_DEBUG (" <<< d " << name.getPartialName (0, name.size () - 1) << ", seq = " << seqno);
+
if (m_forwardingHint == Name ())
{
// invoke callback
diff --git a/src/sync-core.cc b/src/sync-core.cc
index 80daac8..95a5d62 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -56,7 +56,7 @@
SyncCore::~SyncCore()
{
- m_scheduler->shutdown ();
+ // m_scheduler->shutdown ();
// need to "deregister" closures
}
@@ -86,9 +86,9 @@
BytesPtr syncData = serializeMsg (*msg);
m_ccnx->publishData(syncName, *syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " localStateChanged ");
- _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *oldHash);
- _LOG_TRACE (msg);
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] localStateChanged ");
+ _LOG_TRACE ("[" << m_log->GetLocalName () << "] publishes: " << *oldHash);
+ // _LOG_TRACE (msg);
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
@@ -119,6 +119,8 @@
void
SyncCore::handleRecoverInterest(const Name &name)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< RECOVER Interest with name " << name);
+
Bytes hashBytes = name.getComp(name.size() - 1);
// this is the hash unkonwn to the sender of the interest
Hash hash(head(hashBytes), hashBytes.size());
@@ -129,18 +131,20 @@
BytesPtr syncData = serializeMsg (*msg);
m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE (m_log->GetLocalName () << " publishes " << hash);
- _LOG_TRACE (msg);
- }
- else
- {
- // we don't recognize this hash, can not help
+ _LOG_TRACE ("[" << m_log->GetLocalName () << "] publishes " << hash);
+ // _LOG_TRACE (msg);
}
+ else
+ {
+ // we don't recognize this hash, can not help
+ }
}
void
SyncCore::handleSyncInterest(const Name &name)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< SYNC Interest with name " << name);
+
Bytes hashBytes = name.getComp(name.size() - 1);
HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
if (*hash == *m_rootHash)
@@ -194,6 +198,7 @@
void
SyncCore::handleRecoverData(const Name &name, PcoPtr content)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< RECOVER DATA with name: " << name);
//cout << "handle recover data" << end;
handleStateData(*content->contentPtr ());
sendSyncInterest();
@@ -202,6 +207,8 @@
void
SyncCore::handleSyncData(const Name &name, PcoPtr content)
{
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] <<<<< SYNC DATA with name: " << name);
+
// suppress recover in interest - data out of order case
handleStateData(*content->contentPtr ());
@@ -262,7 +269,7 @@
if (diff->state_size() > 0)
{
- m_stateMsgCallback(diff);
+ m_stateMsgCallback (diff);
}
}
@@ -271,11 +278,11 @@
{
Name syncInterest = Name (m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] >>> SYNC Interest for " << *m_rootHash);
+
m_ccnx->sendInterest(syncInterest,
Closure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1)));
-
- _LOG_TRACE (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
}
void
@@ -291,11 +298,12 @@
// append the unknown hash
Name recoverInterest = Name (m_syncPrefix)(RECOVER)(bytes);
+ _LOG_DEBUG ("[" << m_log->GetLocalName () << "] >>> RECOVER Interests for " << *hash);
+
m_ccnx->sendInterest(recoverInterest,
Closure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1)));
- _LOG_TRACE (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
}
else
{
diff --git a/src/sync-core.h b/src/sync-core.h
index 48332da..db20bbb 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -31,7 +31,7 @@
class SyncCore
{
public:
- typedef boost::function<void (const SyncStateMsgPtr & stateMsg) > StateMsgCallback;
+ typedef boost::function<void (SyncStateMsgPtr stateMsg) > StateMsgCallback;
static const int FRESHNESS = 2; // seconds
static const string RECOVER;