sync log's not working as expected
diff --git a/src/sync-core.cc b/src/sync-core.cc
index 67e2f73..ded03f9 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -22,30 +22,31 @@
#include "sync-core.h"
const string SyncCore::RECOVER = "RECOVER";
-const double SyncCore::WAIT = 0.2;
+const double SyncCore::WAIT = 0.05;
const double SyncCore::RANDOM_PERCENT = 0.5;
-SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle, SchedulerPtr scheduler)
+SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle)
: m_log(path, userName.toString())
- , m_scheduler(scheduler)
+ , m_scheduler(new Scheduler())
, m_stateMsgCallback(callback)
, m_userName(userName)
, m_localPrefix(localPrefix)
, m_syncPrefix(syncPrefix)
, m_handle(handle)
+ , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
{
m_rootHash = m_log.RememberStateInStateLog();
m_syncClosure = new Closure(0, boost::bind(&SyncCore::handleSyncData, this, _1, _2), boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1));
m_recoverClosure = new Closure(0, boost::bind(&SyncCore::handleRecoverData, this, _1, _2), boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
m_log.initYP(m_yp);
- //m_scheduler->start();
+ m_scheduler->start();
sendSyncInterest();
}
SyncCore::~SyncCore()
{
- //m_scheduler->shutdown();
+ m_scheduler->shutdown();
if (m_syncClosure != 0)
{
delete m_syncClosure;
@@ -82,16 +83,12 @@
void
SyncCore::updateLocalState(sqlite3_int64 seqno)
{
- cout << "User name: " << m_userName << endl;
m_log.UpdateDeviceSeqNo(m_userName, seqno);
// choose to update locator everytime
m_log.UpdateLocator(m_userName, m_localPrefix);
HashPtr oldHash = m_rootHash;
m_rootHash = m_log.RememberStateInStateLog();
- cout << "Old hash: " << *oldHash << endl;
- cout << "New hash: " << *m_rootHash << endl;
-
SyncStateMsgPtr msg = m_log.FindStateDifferences(*oldHash, *m_rootHash);
// reply sync Interest with oldHash as last component
@@ -102,10 +99,13 @@
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
+ /*
ostringstream ss;
ss << *m_rootHash;
TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::sendSyncInterest, this), ss.str(), m_scheduler, 0.05));
m_scheduler->addTask(task);
+ */
+ sendSyncInterest();
}
void
@@ -135,6 +135,28 @@
{
// we know the hash, should reply everything
SyncStateMsgPtr msg = m_log.FindStateDifferences(*(Hash::Origin), *m_rootHash);
+ // DEBUG
+ assert(msg->state_size() > 0);
+ int size = msg->state_size();
+ int index = 0;
+ cout << "Reply recover interest with: " << endl;
+ while (index < size)
+ {
+ SyncState state = msg->state(index);
+ string strName = state.name();
+ string strLoc = state.locator();
+ cout << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size()) << ", Loc: " << Name((const unsigned char *)strLoc.c_str(), strLoc.size()) << ", seq: " << state.seq() << endl;
+ if (state.type() == SyncState::UPDATE)
+ {
+ cout << "Action: update" << endl;
+ }
+ else
+ {
+ cout << "Action: delete" << endl;
+ }
+ index++;
+ }
+ // END DEBUG
Bytes syncData;
msgToBytes(msg, syncData);
m_handle->publishData(name, syncData, FRESHNESS);
@@ -153,14 +175,13 @@
if (*hash == *m_rootHash)
{
// we have the same hash; nothing needs to be done
+ cout << "same as root hash: " << *hash << endl;
return;
}
else if (m_log.LookupSyncLog(*hash) > 0)
{
// we know something more
- cout << "SyncInterest: " << name << endl;
- cout << "hash: " << *hash << endl;
- cout << "root hash: " << *m_rootHash << endl;
+ cout << "found hash in sync log" << endl;
SyncStateMsgPtr msg = m_log.FindStateDifferences(*hash, *m_rootHash);
Bytes syncData;
@@ -170,11 +191,17 @@
else
{
// we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
+ /*
ostringstream ss;
ss << *hash;
- IntervalGeneratorPtr generator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP));
- TaskPtr task(new PeriodicTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, generator, 1));
+ double wait = m_recoverWaitGenerator->nextInterval();
+ cout << "recover task scheduled after wait: " << wait << endl;
+ TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, wait));
m_scheduler->addTask(task);
+ */
+
+ recover(hash);
+
}
}
@@ -199,6 +226,7 @@
void
SyncCore::handleRecoverData(const Name &name, const Bytes &content)
{
+ cout << "handle recover data" << endl;
handleStateData(content);
}
@@ -231,14 +259,17 @@
SyncState state = msg->state(index);
string devStr = state.name();
Name deviceName((const unsigned char *)devStr.c_str(), devStr.size());
+ cout << "Got Name: " << deviceName;
if (state.type() == SyncState::UPDATE)
{
sqlite3_int64 seqno = state.seq();
+ cout << ", Got seq: " << seqno << endl;
m_log.UpdateDeviceSeqNo(deviceName, seqno);
if (state.has_locator())
{
string locStr = state.locator();
Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
+ cout << ", Got loc: " << locatorName << endl;
m_log.UpdateLocator(deviceName, locatorName);
WriteLock(m_ypMutex);
m_yp[deviceName] = locatorName;
@@ -246,6 +277,7 @@
}
else
{
+ cout << "nani" << endl;
deregister(deviceName);
}
index++;
@@ -255,6 +287,7 @@
HashPtr oldHash = m_rootHash;
m_rootHash = m_log.RememberStateInStateLog();
SyncStateMsgPtr diff = m_log.FindStateDifferences(*oldHash, *m_rootHash);
+ cout << "OldHash: " << *oldHash << ", Newhash: " << *m_rootHash << endl;
m_stateMsgCallback(diff);
}
@@ -268,6 +301,7 @@
void
SyncCore::recover(const HashPtr &hash)
{
+ cout << "Recover for: " << *hash << endl;
if (!(*hash == *m_rootHash) && m_log.LookupSyncLog(*hash) <= 0)
{
// unfortunately we still don't recognize this hash
diff --git a/src/sync-core.h b/src/sync-core.h
index afe5e05..998be7d 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -51,8 +51,7 @@
, const Name &localPrefix // routable name used by the local user
, const Name &syncPrefix // the prefix for the sync collection
, const StateMsgCallback &callback // callback when state change is detected
- , CcnxWrapperPtr handle
- , SchedulerPtr scheduler);
+ , CcnxWrapperPtr handle);
~SyncCore();
// some other code should call this fuction when local prefix
@@ -87,6 +86,9 @@
void
recover(const HashPtr &hash);
+ HashPtr
+ root() { return m_rootHash; }
+
protected:
void
sendSyncInterest();
@@ -120,6 +122,8 @@
Closure *m_syncClosure;
Closure *m_recoverClosure;
+ IntervalGeneratorPtr m_recoverWaitGenerator;
+
};
#endif // SYNC_CORE_H
diff --git a/src/sync-log.cc b/src/sync-log.cc
index 90f5ba1..d5a8750 100644
--- a/src/sync-log.cc
+++ b/src/sync-log.cc
@@ -354,8 +354,6 @@
{
SyncState *state = msg->add_state ();
- cout << "Bytes: " << sqlite3_column_bytes (stmt, 0) << endl;
- cout << "Pointer: " << sqlite3_column_blob (stmt, 0) << endl;
state->set_name (reinterpret_cast<const char*> (sqlite3_column_blob (stmt, 0)), sqlite3_column_bytes (stmt, 0));
// locator is optional, so must check if it is null
@@ -365,7 +363,7 @@
}
sqlite3_int64 newSeqNo = sqlite3_column_int64 (stmt, 3);
- if (newSeqNo > 0)
+ if (newSeqNo >= 0)
{
state->set_type (SyncState::UPDATE);
state->set_seq (newSeqNo);