"Small" reorganization and cleaning
diff --git a/src/sync-core.cc b/src/sync-core.cc
index cae3d6b..c0dc45b 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -20,6 +20,7 @@
*/
#include "sync-core.h"
+#include "sync-state-helper.h"
#include "logging.h"
#include "random-interval-generator.h"
@@ -32,51 +33,20 @@
const double SyncCore::RANDOM_PERCENT = 0.5;
using namespace boost;
+using namespace Ccnx;
-// for debugging
-static void
-printMsg(SyncStateMsgPtr &msg)
-{
- _LOG_TRACE (" ===== start Msg ======");
- int size = msg->state_size();
- if (size > 0)
- {
- int index = 0;
- while (index < size)
- {
- SyncState state = msg->state(index);
- string strName = state.name();
- string strLocator = state.locator();
- sqlite3_int64 seq = state.seq();
- _LOG_TRACE ("Name: " << Name((const unsigned char *)strName.c_str(), strName.size())
- <<", Locator: " << Name((const unsigned char *)strLocator.c_str(), strLocator.size())
- << ", seq: " << seq);
- index ++;
- }
- }
- else
- {
- _LOG_TRACE ("Msg size 0");
- }
- _LOG_TRACE (" ++++++++ end Msg ++++++++ ");
-}
-
-SyncCore::SyncCore(SyncLogPtr syncLog, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, const CcnxWrapperPtr &handle, const SchedulerPtr &scheduler)
- : m_log(syncLog)
- , m_scheduler(scheduler)
- , m_stateMsgCallback(callback)
- // , m_userName(userName)
- , m_syncPrefix(syncPrefix)
- , m_handle(handle)
- , m_syncClosure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
- boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1))
- , m_recoverClosure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
- boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1))
- , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
+SyncCore::SyncCore(SyncLogPtr syncLog, const Name &userName, const Name &localPrefix, const Name &syncPrefix,
+ const StateMsgCallback &callback, CcnxWrapperPtr ccnx, SchedulerPtr scheduler)
+ : m_ccnx (ccnx)
+ , m_log(syncLog)
+ , m_scheduler(scheduler)
+ , m_stateMsgCallback(callback)
+ , m_syncPrefix(syncPrefix)
+ , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
{
m_rootHash = m_log->RememberStateInStateLog();
- m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
+ m_ccnx->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
// m_log->initYP(m_yp);
m_log->UpdateLocalLocator (localPrefix);
@@ -89,52 +59,23 @@
// need to "deregister" closures
}
-// Name
-// SyncCore::yp(const Name &deviceName)
-// {
-// Name locator;
-// ReadLock lock(m_ypMutex);
-// if (m_yp.find(deviceName) != m_yp.end())
-// {
-// locator = m_yp[deviceName];
-// }
-// else
-// {
-// cout << "self: " << m_userName << ", deviceName: " << deviceName << " not found in yp " << endl;
-// }
-// return locator;
-// }
-
-// void
-// SyncCore::updateLocalPrefix(const Name &localPrefix)
-// {
-// m_localPrefix = localPrefix;
-// // optionally, we can have a sync action to announce the new prefix
-// // we are not doing this for now
-// }
-
void
SyncCore::updateLocalState(sqlite3_int64 seqno)
{
m_log->UpdateLocalSeqNo (seqno);
- // choose to update locator everytime
- // m_log->UpdateLocator(m_userName, m_localPrefix);
- // {
- // WriteLock lock(m_ypMutex);
- // m_yp[m_userName] = m_localPrefix;
- // }
+
HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ m_rootHash = m_log->RememberStateInStateLog ();
SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldHash, *m_rootHash);
// reply sync Interest with oldHash as last component
- Name syncName = constructSyncName(oldHash);
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(syncName, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes: " << *oldHash);
- printMsg(msg);
+ Name syncName = Name (m_syncPrefix)(oldHash->GetHash(), oldHash->GetHashBytes());
+ BytesPtr syncData = serializeMsg (msg);
+
+ m_ccnx->publishData(syncName, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *oldHash);
+ _LOG_TRACE (msg);
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
@@ -172,35 +113,11 @@
{
// we know the hash, should reply everything
SyncStateMsgPtr msg = m_log->FindStateDifferences(*(Hash::Origin), *m_rootHash);
- // DEBUG
- /*
- assert(msg->state_size() > 0);
- int size = msg->state_size();
- int index = 0;
- cerr << "Reply recover interest with: " << endl;
- while (index < size)
- {
- SyncState state = msg->state(index);
- string strName = state.name();
- string strLoc = state.locator();
- cout << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size()) << ", Loc: " << Name((const unsigned char *)strLoc.c_str(), strLoc.size()) << ", seq: " << state.seq() << endl;
- if (state.type() == SyncState::UPDATE)
- {
- cout << "Action: update" << endl;
- }
- else
- {
- cout << "Action: delete" << endl;
- }
- index++;
- }
- */
- // END DEBUG
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(name, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes " << hash);
- printMsg(msg);
+
+ BytesPtr syncData = serializeMsg (msg);
+ m_ccnx->publishData(name, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes " << hash);
+ _LOG_TRACE (msg);
}
else
{
@@ -216,27 +133,26 @@
if (*hash == *m_rootHash)
{
// we have the same hash; nothing needs to be done
- _LOG_DEBUG ("same as root hash: " << *hash);
+ _LOG_TRACE ("same as root hash: " << *hash);
return;
}
else if (m_log->LookupSyncLog(*hash) > 0)
{
// we know something more
- _LOG_DEBUG ("found hash in sync log");
+ _LOG_TRACE ("found hash in sync log");
SyncStateMsgPtr msg = m_log->FindStateDifferences(*hash, *m_rootHash);
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(name, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes: " << *hash);
- printMsg(msg);
+ BytesPtr syncData = serializeMsg (msg);
+ m_ccnx->publishData(name, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *hash);
+ _LOG_TRACE (msg);
}
else
{
// we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
double wait = m_recoverWaitGenerator->nextInterval();
- _LOG_DEBUG (m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash);
- _LOG_DEBUG ("recover task scheduled after wait: " << wait);
+ _LOG_TRACE (m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash);
+ _LOG_TRACE ("recover task scheduled after wait: " << wait);
Scheduler::scheduleOneTimeTask (m_scheduler,
wait, boost::bind(&SyncCore::recover, this, hash),
@@ -292,8 +208,8 @@
return;
}
- _LOG_DEBUG (m_log->GetLocalName () << " receives Msg ");
- printMsg (msg);
+ _LOG_TRACE (m_log->GetLocalName () << " receives Msg ");
+ _LOG_TRACE (msg);
int size = msg->state_size();
int index = 0;
while (index < size)
@@ -311,16 +227,15 @@
{
string locStr = state.locator();
Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
- // cout << ", Got loc: " << locatorName << endl;
+ // cout << ", Got loc: " << locatorName << endl;
m_log->UpdateLocator(deviceName, locatorName);
- // WriteLock lock(m_ypMutex);
- // m_yp[deviceName] = locatorName;
- _LOG_DEBUG ("self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName);
+
+ _LOG_TRACE ("self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName);
}
}
else
{
- _LOG_TRACE ("nani");
+ _LOG_ERROR ("Receive SYNC DELETE, but we don't support it yet");
deregister(deviceName);
}
index++;
@@ -341,9 +256,13 @@
void
SyncCore::sendSyncInterest()
{
- Name syncInterest = constructSyncName(m_rootHash);
- m_handle->sendInterest(syncInterest, m_syncClosure);
- _LOG_DEBUG (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
+ Name syncInterest = Name (m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
+
+ m_ccnx->sendInterest(syncInterest,
+ Closure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
+ boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1)));
+
+ _LOG_TRACE (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
}
void
@@ -351,16 +270,19 @@
{
if (!(*hash == *m_rootHash) && m_log->LookupSyncLog(*hash) <= 0)
{
- _LOG_DEBUG (m_log->GetLocalName () << ", Recover for: " << *hash);
+ _LOG_TRACE (m_log->GetLocalName () << ", Recover for: " << *hash);
// unfortunately we still don't recognize this hash
Bytes bytes;
readRaw(bytes, (const unsigned char *)hash->GetHash(), hash->GetHashBytes());
- Name recoverInterest = m_syncPrefix;
- recoverInterest.appendComp(RECOVER);
+
// append the unknown hash
- recoverInterest.appendComp(bytes);
- m_handle->sendInterest(recoverInterest, m_recoverClosure);
- _LOG_DEBUG (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
+ Name recoverInterest = Name (m_syncPrefix)(RECOVER)(bytes);
+
+ m_ccnx->sendInterest(recoverInterest,
+ Closure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
+ boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1)));
+
+ _LOG_TRACE (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
}
else
{
@@ -375,24 +297,6 @@
// TODO: handle deregistering
}
-Name
-SyncCore::constructSyncName(const HashPtr &hash)
-{
- Bytes bytes;
- readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
- Name syncName = m_syncPrefix;
- syncName.appendComp(bytes);
- return syncName;
-}
-
-void
-SyncCore::msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes)
-{
- int size = msg->ByteSize();
- bytes.resize(size);
- msg->SerializeToArray(head(bytes), size);
-}
-
sqlite3_int64
SyncCore::seq(const Name &name)
{