sync log's not working as expected
diff --git a/src/sync-core.cc b/src/sync-core.cc
index 67e2f73..ded03f9 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -22,30 +22,31 @@
#include "sync-core.h"
const string SyncCore::RECOVER = "RECOVER";
-const double SyncCore::WAIT = 0.2;
+const double SyncCore::WAIT = 0.05;
const double SyncCore::RANDOM_PERCENT = 0.5;
-SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle, SchedulerPtr scheduler)
+SyncCore::SyncCore(const string &path, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, CcnxWrapperPtr handle)
: m_log(path, userName.toString())
- , m_scheduler(scheduler)
+ , m_scheduler(new Scheduler())
, m_stateMsgCallback(callback)
, m_userName(userName)
, m_localPrefix(localPrefix)
, m_syncPrefix(syncPrefix)
, m_handle(handle)
+ , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
{
m_rootHash = m_log.RememberStateInStateLog();
m_syncClosure = new Closure(0, boost::bind(&SyncCore::handleSyncData, this, _1, _2), boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1));
m_recoverClosure = new Closure(0, boost::bind(&SyncCore::handleRecoverData, this, _1, _2), boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
m_log.initYP(m_yp);
- //m_scheduler->start();
+ m_scheduler->start();
sendSyncInterest();
}
SyncCore::~SyncCore()
{
- //m_scheduler->shutdown();
+ m_scheduler->shutdown();
if (m_syncClosure != 0)
{
delete m_syncClosure;
@@ -82,16 +83,12 @@
void
SyncCore::updateLocalState(sqlite3_int64 seqno)
{
- cout << "User name: " << m_userName << endl;
m_log.UpdateDeviceSeqNo(m_userName, seqno);
// choose to update locator everytime
m_log.UpdateLocator(m_userName, m_localPrefix);
HashPtr oldHash = m_rootHash;
m_rootHash = m_log.RememberStateInStateLog();
- cout << "Old hash: " << *oldHash << endl;
- cout << "New hash: " << *m_rootHash << endl;
-
SyncStateMsgPtr msg = m_log.FindStateDifferences(*oldHash, *m_rootHash);
// reply sync Interest with oldHash as last component
@@ -102,10 +99,13 @@
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
+ /*
ostringstream ss;
ss << *m_rootHash;
TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::sendSyncInterest, this), ss.str(), m_scheduler, 0.05));
m_scheduler->addTask(task);
+ */
+ sendSyncInterest();
}
void
@@ -135,6 +135,28 @@
{
// we know the hash, should reply everything
SyncStateMsgPtr msg = m_log.FindStateDifferences(*(Hash::Origin), *m_rootHash);
+ // DEBUG
+ assert(msg->state_size() > 0);
+ int size = msg->state_size();
+ int index = 0;
+ cout << "Reply recover interest with: " << endl;
+ while (index < size)
+ {
+ SyncState state = msg->state(index);
+ string strName = state.name();
+ string strLoc = state.locator();
+ cout << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size()) << ", Loc: " << Name((const unsigned char *)strLoc.c_str(), strLoc.size()) << ", seq: " << state.seq() << endl;
+ if (state.type() == SyncState::UPDATE)
+ {
+ cout << "Action: update" << endl;
+ }
+ else
+ {
+ cout << "Action: delete" << endl;
+ }
+ index++;
+ }
+ // END DEBUG
Bytes syncData;
msgToBytes(msg, syncData);
m_handle->publishData(name, syncData, FRESHNESS);
@@ -153,14 +175,13 @@
if (*hash == *m_rootHash)
{
// we have the same hash; nothing needs to be done
+ cout << "same as root hash: " << *hash << endl;
return;
}
else if (m_log.LookupSyncLog(*hash) > 0)
{
// we know something more
- cout << "SyncInterest: " << name << endl;
- cout << "hash: " << *hash << endl;
- cout << "root hash: " << *m_rootHash << endl;
+ cout << "found hash in sync log" << endl;
SyncStateMsgPtr msg = m_log.FindStateDifferences(*hash, *m_rootHash);
Bytes syncData;
@@ -170,11 +191,17 @@
else
{
// we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
+ /*
ostringstream ss;
ss << *hash;
- IntervalGeneratorPtr generator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP));
- TaskPtr task(new PeriodicTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, generator, 1));
+ double wait = m_recoverWaitGenerator->nextInterval();
+ cout << "recover task scheduled after wait: " << wait << endl;
+ TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, wait));
m_scheduler->addTask(task);
+ */
+
+ recover(hash);
+
}
}
@@ -199,6 +226,7 @@
void
SyncCore::handleRecoverData(const Name &name, const Bytes &content)
{
+ cout << "handle recover data" << endl;
handleStateData(content);
}
@@ -231,14 +259,17 @@
SyncState state = msg->state(index);
string devStr = state.name();
Name deviceName((const unsigned char *)devStr.c_str(), devStr.size());
+ cout << "Got Name: " << deviceName;
if (state.type() == SyncState::UPDATE)
{
sqlite3_int64 seqno = state.seq();
+ cout << ", Got seq: " << seqno << endl;
m_log.UpdateDeviceSeqNo(deviceName, seqno);
if (state.has_locator())
{
string locStr = state.locator();
Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
+ cout << ", Got loc: " << locatorName << endl;
m_log.UpdateLocator(deviceName, locatorName);
WriteLock(m_ypMutex);
m_yp[deviceName] = locatorName;
@@ -246,6 +277,7 @@
}
else
{
+ cout << "nani" << endl;
deregister(deviceName);
}
index++;
@@ -255,6 +287,7 @@
HashPtr oldHash = m_rootHash;
m_rootHash = m_log.RememberStateInStateLog();
SyncStateMsgPtr diff = m_log.FindStateDifferences(*oldHash, *m_rootHash);
+ cout << "OldHash: " << *oldHash << ", Newhash: " << *m_rootHash << endl;
m_stateMsgCallback(diff);
}
@@ -268,6 +301,7 @@
void
SyncCore::recover(const HashPtr &hash)
{
+ cout << "Recover for: " << *hash << endl;
if (!(*hash == *m_rootHash) && m_log.LookupSyncLog(*hash) <= 0)
{
// unfortunately we still don't recognize this hash
diff --git a/src/sync-core.h b/src/sync-core.h
index afe5e05..998be7d 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -51,8 +51,7 @@
, const Name &localPrefix // routable name used by the local user
, const Name &syncPrefix // the prefix for the sync collection
, const StateMsgCallback &callback // callback when state change is detected
- , CcnxWrapperPtr handle
- , SchedulerPtr scheduler);
+ , CcnxWrapperPtr handle);
~SyncCore();
// some other code should call this fuction when local prefix
@@ -87,6 +86,9 @@
void
recover(const HashPtr &hash);
+ HashPtr
+ root() { return m_rootHash; }
+
protected:
void
sendSyncInterest();
@@ -120,6 +122,8 @@
Closure *m_syncClosure;
Closure *m_recoverClosure;
+ IntervalGeneratorPtr m_recoverWaitGenerator;
+
};
#endif // SYNC_CORE_H
diff --git a/src/sync-log.cc b/src/sync-log.cc
index 90f5ba1..d5a8750 100644
--- a/src/sync-log.cc
+++ b/src/sync-log.cc
@@ -354,8 +354,6 @@
{
SyncState *state = msg->add_state ();
- cout << "Bytes: " << sqlite3_column_bytes (stmt, 0) << endl;
- cout << "Pointer: " << sqlite3_column_blob (stmt, 0) << endl;
state->set_name (reinterpret_cast<const char*> (sqlite3_column_blob (stmt, 0)), sqlite3_column_bytes (stmt, 0));
// locator is optional, so must check if it is null
@@ -365,7 +363,7 @@
}
sqlite3_int64 newSeqNo = sqlite3_column_int64 (stmt, 3);
- if (newSeqNo > 0)
+ if (newSeqNo >= 0)
{
state->set_type (SyncState::UPDATE);
state->set_seq (newSeqNo);
diff --git a/test/test-sync-core.cc b/test/test-sync-core.cc
index 68cc69b..d65cafe 100644
--- a/test/test-sync-core.cc
+++ b/test/test-sync-core.cc
@@ -9,19 +9,45 @@
BOOST_AUTO_TEST_SUITE(SyncCoreTests)
-SyncStateMsgPtr msg1;
-SyncStateMsgPtr msg2;
-
-void callback1(const SyncStateMsgPtr &ptr)
+typedef struct
{
- msg1 = ptr;
+ Name deviceName;
+ Name locator;
+ int64_t seq;
+} Result;
+
+Result result1;
+Result result2;
+
+void setResult(const SyncStateMsgPtr &msg, Result &result)
+{
+ if (msg->state_size() > 0)
+ {
+ SyncState state = msg->state(0);
+ string strName = state.name();
+ result.deviceName = Name((const unsigned char *)strName.c_str(), strName.size());
+ string strLoc = state.locator();
+ result.locator = Name((const unsigned char *)strLoc.c_str(), strName.size());
+ result.seq = state.seq();
+ }
+ else
+ {
+ cout << "Msg state size: " << msg->state_size() << endl;
+ }
}
-void callback2(const SyncStateMsgPtr &ptr)
+void callback1(const SyncStateMsgPtr &msg)
{
- msg2 = ptr;
+ setResult(msg, result1);
}
+void callback2(const SyncStateMsgPtr &msg)
+{
+ setResult(msg, result2);
+}
+
+
+
BOOST_AUTO_TEST_CASE(SyncCoreTest)
{
string dir = "./SyncCoreTest";
@@ -34,8 +60,6 @@
Name syncPrefix("/broadcast/darkknight");
CcnxWrapperPtr c1(new CcnxWrapper());
CcnxWrapperPtr c2(new CcnxWrapper());
- SchedulerPtr scheduler(new Scheduler());
- scheduler->start();
// clean the test dir
path d(dir);
@@ -44,39 +68,38 @@
remove_all(d);
}
- SyncCore *core1 = new SyncCore(dir1, user1, loc1, syncPrefix, bind(callback1, _1), c1, scheduler);
+ SyncCore *core1 = new SyncCore(dir1, user1, loc1, syncPrefix, bind(callback1, _1), c1);
usleep(10000);
- SyncCore *core2 = new SyncCore(dir2, user2, loc2, syncPrefix, bind(callback2, _1), c2, scheduler);
- usleep(10000);
+ SyncCore *core2 = new SyncCore(dir2, user2, loc2, syncPrefix, bind(callback2, _1), c2);
+ usleep(1000000);
SyncState state;
+ HashPtr root1 = core1->root();
+ HashPtr root2 = core2->root();
+ BOOST_CHECK_EQUAL(*root1, *root2);
core1->updateLocalState(1);
usleep(100000);
- BOOST_CHECK_EQUAL(msg2->state_size(), 1);
- state = msg2->state(0);
- BOOST_CHECK_EQUAL(state.seq(), 1);
- BOOST_CHECK_EQUAL(user1, state.name());
- BOOST_CHECK_EQUAL(loc1, state.locator());
+ BOOST_CHECK_EQUAL(result2.seq, 1);
+ BOOST_CHECK_EQUAL(result2.deviceName, user1);
+ BOOST_CHECK_EQUAL(result2.locator, loc1);
core1->updateLocalState(5);
usleep(100000);
- state = msg2->state(0);
- BOOST_CHECK_EQUAL(state.seq(), 5);
+ BOOST_CHECK_EQUAL(result2.seq, 5);
core2->updateLocalState(10);
usleep(100000);
- state = msg1->state(0);
- BOOST_CHECK_EQUAL(state.seq(), 10);
+ BOOST_CHECK_EQUAL(result1.seq, 10);
+ BOOST_CHECK_EQUAL(result1.deviceName, user2);
+ BOOST_CHECK_EQUAL(result1.locator, loc2);
// simple simultaneous data generation
core1->updateLocalState(11);
core2->updateLocalState(12);
- usleep(100000);
- state = msg1->state(0);
- BOOST_CHECK_EQUAL(state.seq(), 12);
- state = msg2->state(0);
- BOOST_CHECK_EQUAL(state.seq(), 11);
+ usleep(1000000);
+ BOOST_CHECK_EQUAL(result1.seq, 12);
+ BOOST_CHECK_EQUAL(result2.seq, 11);
// clean the test dir
if (exists(d))