sync: Switch code to use ndn-cxx
Thic commit also moves code to ndn::chronoshare namespace
Change-Id: I6eae8cab53fd68faa0e9523d166dbb60d1b59a95
diff --git a/src/sync-core.cpp b/src/sync-core.cpp
index c6bf498..c4f9a26 100644
--- a/src/sync-core.cpp
+++ b/src/sync-core.cpp
@@ -19,63 +19,69 @@
*/
#include "sync-core.hpp"
-#include "logging.hpp"
-#include "periodic-task.hpp"
-#include "random-interval-generator.hpp"
-#include "simple-interval-generator.hpp"
#include "sync-state-helper.hpp"
+#include "core/logging.hpp"
+
+#include <ndn-cxx/util/string-helper.hpp>
#include <boost/lexical_cast.hpp>
-#include <boost/make_shared.hpp>
+
+namespace ndn {
+namespace chronoshare {
INIT_LOGGER("Sync.Core")
-const string SyncCore::RECOVER = "RECOVER";
+const int SyncCore::FRESHNESS = 2;
+const std::string SyncCore::RECOVER = "RECOVER";
const double SyncCore::WAIT = 0.05;
const double SyncCore::RANDOM_PERCENT = 0.5;
-const std::string SYNC_INTEREST_TAG = "send-sync-interest";
-const std::string SYNC_INTEREST_TAG2 = "send-sync-interest2";
-
-const std::string LOCAL_STATE_CHANGE_DELAYED_TAG = "local-state-changed";
-
-using namespace boost;
-using namespace Ndnx;
-
-SyncCore::SyncCore(SyncLogPtr syncLog, const Name& userName, const Name& localPrefix,
- const Name& syncPrefix, const StateMsgCallback& callback, CcnxWrapperPtr ccnx,
- double syncInterestInterval /*= -1.0*/)
- : m_ccnx(ccnx)
+SyncCore::SyncCore(Face& face, SyncLogPtr syncLog, const Name& userName, const Name& localPrefix,
+ const Name& syncPrefix, const StateMsgCallback& callback,
+ long syncInterestInterval /*= -1.0*/)
+ : m_face(face)
, m_log(syncLog)
- , m_scheduler(new Scheduler())
+ , m_scheduler(m_face.getIoService())
+ , m_syncInterestEvent(m_scheduler)
+ , m_periodicInterestEvent(m_scheduler)
+ , m_localStateDelayedEvent(m_scheduler)
, m_stateMsgCallback(callback)
, m_syncPrefix(syncPrefix)
, m_recoverWaitGenerator(
- new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
+ new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::Direction::UP))
, m_syncInterestInterval(syncInterestInterval)
{
- m_rootHash = m_log->RememberStateInStateLog();
+ m_rootDigest = m_log->RememberStateInStateLog();
- m_ndnx->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
- // m_log->initYP(m_yp);
+ m_registeredPrefixId =
+ m_face.setInterestFilter(m_syncPrefix, bind(&SyncCore::handleInterest, this, _1, _2),
+ RegisterPrefixSuccessCallback(),
+ bind(&SyncCore::onRegisterFailed, this, _1, _2));
+
m_log->UpdateLocalLocator(localPrefix);
- m_scheduler->start();
+ time::seconds interval = time::seconds(
+ (m_syncInterestInterval > 0 && m_syncInterestInterval < 30) ? m_syncInterestInterval : 4);
- double interval =
- (m_syncInterestInterval > 0 && m_syncInterestInterval < 30.0) ? m_syncInterestInterval : 4.0;
- m_sendSyncInterestTask =
- make_shared<PeriodicTask>(bind(&SyncCore::sendSyncInterest, this), SYNC_INTEREST_TAG,
- m_scheduler, make_shared<SimpleIntervalGenerator>(interval));
- // sendSyncInterest();
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.1, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ m_periodicInterestEvent =
+ m_scheduler.scheduleEvent(interval, bind(&SyncCore::sendPeriodicSyncInterest, this, interval));
+
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(100), bind(&SyncCore::sendSyncInterest, this));
+}
+
+void
+SyncCore::sendPeriodicSyncInterest(const time::seconds& interval)
+{
+ sendSyncInterest();
+ m_periodicInterestEvent =
+ m_scheduler.scheduleEvent(interval, bind(&SyncCore::sendPeriodicSyncInterest, this, interval));
}
SyncCore::~SyncCore()
{
- m_scheduler->shutdown();
// need to "deregister" closures
+ m_face.unsetInterestFilter(m_registeredPrefixId);
}
void
@@ -88,25 +94,40 @@
void
SyncCore::localStateChanged()
{
- HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ ConstBufferPtr oldDigest = m_rootDigest;
+ m_rootDigest = m_log->RememberStateInStateLog();
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldHash, *m_rootHash);
-
- // reply sync Interest with oldHash as last component
- Name syncName = Name(m_syncPrefix)(oldHash->GetHash(), oldHash->GetHashBytes());
- BytesPtr syncData = serializeGZipMsg(*msg);
-
- m_ccnx->publishData(syncName, *syncData, FRESHNESS);
_LOG_DEBUG("[" << m_log->GetLocalName() << "] localStateChanged ");
- _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes: " << oldHash->shortHash());
- // _LOG_TRACE (msg);
+ _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes: oldDigest--" << toHex(*oldDigest)
+ << " newDigest--"
+ << toHex(*m_rootDigest));
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
- // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.05, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldDigest, *m_rootDigest);
+
+ // reply sync Interest with oldDigest as last component
+
+ Name syncName(m_syncPrefix);
+ syncName.appendImplicitSha256Digest(oldDigest);
+
+ BufferPtr syncData = serializeGZipMsg(*msg);
+
+ // Create Data packet
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(syncName);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE(msg);
+
+ // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no
+ // problem, we know the new root digest already;
+ // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets
+ // reversed at receivers
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(50), bind(&SyncCore::sendSyncInterest, this));
+
// sendSyncInterest();
}
@@ -114,90 +135,166 @@
void
SyncCore::localStateChangedDelayed()
{
- // many calls to localStateChangedDelayed within 0.5 second will be suppressed to one localStateChanged calls
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.5, bind(&SyncCore::localStateChanged, this),
- LOCAL_STATE_CHANGE_DELAYED_TAG);
+ // many calls to localStateChangedDelayed within 0.5 second will be suppressed to one
+ // localStateChanged calls
+ m_localStateDelayedEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(500), bind(&SyncCore::localStateChanged, this));
+}
+
+// ------------------------------------------------------------------------------------ send &
+// handle interest
+
+void
+SyncCore::sendSyncInterest()
+{
+ Name syncInterest(m_syncPrefix);
+ // syncInterest.append(name::Component(*m_rootDigest));
+ syncInterest.appendImplicitSha256Digest(m_rootDigest);
+
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> send SYNC Interest for " << toHex(*m_rootDigest)
+ << ": "
+ << syncInterest);
+
+ Interest interest(syncInterest);
+ if (m_syncInterestInterval > 0 && m_syncInterestInterval < 30) {
+ interest.setInterestLifetime(time::seconds(m_syncInterestInterval));
+ }
+
+ m_face.expressInterest(interest, bind(&SyncCore::handleSyncData, this, _1, _2),
+ bind(&SyncCore::handleSyncInterestTimeout, this, _1));
+
+ // // if there is a pending syncSyncInterest task, reschedule it to be m_syncInterestInterval seconds
+ // // from now
+ // // if no such task exists, it will be added
+ // m_scheduler->rescheduleTask(m_sendSyncInterestTask);
}
void
-SyncCore::handleInterest(const Name& name)
+SyncCore::recover(ConstBufferPtr digest)
{
+ if (!(*digest == *m_rootDigest) && m_log->LookupSyncLog(*digest) <= 0) {
+ _LOG_TRACE(m_log->GetLocalName() << ", Recover for received_Digest " << toHex(*digest));
+ // unfortunately we still don't recognize this digest
+ // append the unknown digest
+ Name recoverInterest(m_syncPrefix);
+ recoverInterest.append(RECOVER).appendImplicitSha256Digest(digest);
+
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> send RECOVER Interests for " << toHex(*digest));
+
+ m_face.expressInterest(recoverInterest,
+ bind(&SyncCore::handleRecoverData, this, _1, _2),
+ bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
+ }
+ else {
+ // we already learned the digest; cheers!
+ }
+}
+
+void
+SyncCore::handleInterest(const InterestFilter& filter, const Interest& interest)
+{
+ Name name = interest.getName();
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<< handleInterest with Name: " << name);
int size = name.size();
int prefixSize = m_syncPrefix.size();
if (size == prefixSize + 1) {
// this is normal sync interest
handleSyncInterest(name);
}
- else if (size == prefixSize + 2 && name.getCompAsString(m_syncPrefix.size()) == RECOVER) {
+ else if (size == prefixSize + 2 && name.get(m_syncPrefix.size()).toUri() == RECOVER) {
// this is recovery interest
handleRecoverInterest(name);
}
}
void
-SyncCore::handleRecoverInterest(const Name& name)
-{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< RECOVER Interest with name " << name);
-
- Bytes hashBytes = name.getComp(name.size() - 1);
- // this is the hash unkonwn to the sender of the interest
- Hash hash(head(hashBytes), hashBytes.size());
- if (m_log->LookupSyncLog(hash) > 0) {
- // we know the hash, should reply everything
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*(Hash::Origin), *m_rootHash);
-
- BytesPtr syncData = serializeGZipMsg(*msg);
- m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes " << hash.shortHash());
- // _LOG_TRACE (msg);
- }
- else {
- // we don't recognize this hash, can not help
- }
-}
-
-void
SyncCore::handleSyncInterest(const Name& name)
{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< SYNC Interest with name " << name);
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< handle SYNC Interest with Name: " << name);
- Bytes hashBytes = name.getComp(name.size() - 1);
- HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
- if (*hash == *m_rootHash) {
- // we have the same hash; nothing needs to be done
- _LOG_TRACE("same as root hash: " << hash->shortHash());
+ ConstBufferPtr digest = make_shared<Buffer>(name.get(-1).value(), name.get(-1).value_size());
+ if (*digest == *m_rootDigest) {
+ // we have the same digest; nothing needs to be done
+ _LOG_TRACE("same as root digest: " << toHex(*digest));
return;
}
- else if (m_log->LookupSyncLog(*hash) > 0) {
+ else if (m_log->LookupSyncLog(*digest) > 0) {
// we know something more
- _LOG_TRACE("found hash in sync log");
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*hash, *m_rootHash);
+ _LOG_TRACE("found digest in sync log");
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*digest, *m_rootDigest);
- BytesPtr syncData = serializeGZipMsg(*msg);
- m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE(m_log->GetLocalName() << " publishes: " << hash->shortHash());
+ BufferPtr syncData = serializeGZipMsg(*msg);
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(name);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE(m_log->GetLocalName() << " publishes: " << toHex(*digest) << " my_rootDigest:"
+ << toHex(*m_rootDigest));
_LOG_TRACE(msg);
}
else {
- // we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
+ // we don't recognize the digest, send recover Interest if still don't know the digest after a
+ // randomized wait period
double wait = m_recoverWaitGenerator->nextInterval();
- _LOG_TRACE(m_log->GetLocalName() << ", rootHash: " << *m_rootHash
- << ", hash: " << hash->shortHash());
+ _LOG_TRACE(m_log->GetLocalName() << ", my_rootDigest: " << toHex(*m_rootDigest)
+ << ", received_Digest: "
+ << toHex(*digest));
_LOG_TRACE("recover task scheduled after wait: " << wait);
- Scheduler::scheduleOneTimeTask(m_scheduler, wait, boost::bind(&SyncCore::recover, this, hash),
- "r-" + lexical_cast<string>(*hash));
+ // @todo Figure out how to cancel scheduled events when class is destroyed
+ m_scheduler.scheduleEvent(time::milliseconds(static_cast<int>(wait * 1000)),
+ bind(&SyncCore::recover, this, digest));
}
}
void
-SyncCore::handleSyncInterestTimeout(const Name& name, const Closure& closure, Selectors selectors)
+SyncCore::handleRecoverInterest(const Name& name)
+{
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< handle RECOVER Interest with name " << name);
+
+ ConstBufferPtr digest = make_shared<Buffer>(name.get(-1).value(), name.get(-1).value_size());
+ // this is the digest unkonwn to the sender of the interest
+ _LOG_DEBUG("rootDigest: " << toHex(*digest));
+ if (m_log->LookupSyncLog(*digest) > 0) {
+ _LOG_DEBUG("Find in our sync_log! " << toHex(*digest));
+ // we know the digest, should reply everything and the newest thing, but not the digest!!! This
+ // is important
+ unsigned char _origin = 0;
+ BufferPtr origin = make_shared<Buffer>(_origin);
+ // std::cout << "size of origin " << origin->size() << std::endl;
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*origin, *m_rootDigest);
+
+ BufferPtr syncData = serializeGZipMsg(*msg);
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(name);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes " << toHex(*digest)
+ << " FindStateDifferences(0, m_rootDigest/"
+ << toHex(*m_rootDigest)
+ << ")");
+ _LOG_TRACE(msg);
+ }
+ else {
+ // we don't recognize this digest, can not help
+ _LOG_DEBUG("we don't recognize this digest, can not help");
+ }
+}
+
+void
+SyncCore::handleSyncInterestTimeout(const Interest& interest)
{
// sync interest will be resent by scheduler
}
void
-SyncCore::handleRecoverInterestTimeout(const Name& name, const Closure& closure, Selectors selectors)
+SyncCore::handleRecoverInterestTimeout(const Interest& interest)
{
// We do not re-express recovery interest for now
// if difference is not resolved, the sync interest will trigger
@@ -206,46 +303,45 @@
}
void
-SyncCore::handleRecoverData(const Name& name, PcoPtr content)
+SyncCore::handleSyncData(const Interest& interest, Data& data)
{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< RECOVER DATA with name: " << name);
- //cout << "handle recover data" << end;
- if (content && content->contentPtr() && content->contentPtr()->size() > 0) {
- handleStateData(*content->contentPtr());
- }
- else {
- _LOG_ERROR("Got recovery DATA with empty content");
- }
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< receive SYNC DATA with interest: "
+ << interest.toUri());
- // sendSyncInterest();
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- Scheduler::scheduleOneTimeTask(m_scheduler, 0, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
-}
-
-void
-SyncCore::handleSyncData(const Name& name, PcoPtr content)
-{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< SYNC DATA with name: " << name);
-
+ const Block& content = data.getContent();
// suppress recover in interest - data out of order case
- if (content && content->contentPtr() && content->contentPtr()->size() > 0) {
- handleStateData(*content->contentPtr());
+ if (data.getContent().value() && content.size() > 0) {
+ handleStateData(Buffer(content.value(), content.value_size()));
}
else {
_LOG_ERROR("Got sync DATA with empty content");
}
// resume outstanding sync interest
- // sendSyncInterest();
-
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- Scheduler::scheduleOneTimeTask(m_scheduler, 0, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(0), bind(&SyncCore::sendSyncInterest, this));
}
void
-SyncCore::handleStateData(const Bytes& content)
+SyncCore::handleRecoverData(const Interest& interest, Data& data)
+{
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< receive RECOVER DATA with interest: "
+ << interest.toUri());
+ // cout << "handle recover data" << end;
+ const Block& content = data.getContent();
+ if (content.value() && content.size() > 0) {
+ handleStateData(Buffer(content.value(), content.value_size()));
+ }
+ else {
+ _LOG_ERROR("Got recovery DATA with empty content");
+ }
+
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(0), bind(&SyncCore::sendSyncInterest, this));
+}
+
+void
+SyncCore::handleStateData(const Buffer& content)
{
SyncStateMsgPtr msg = deserializeGZipMsg<SyncStateMsg>(content);
if (!(msg)) {
@@ -254,22 +350,21 @@
return;
}
- _LOG_TRACE(m_log->GetLocalName() << " receives Msg ");
+ _LOG_TRACE("[" << m_log->GetLocalName() << "]"
+ << " receives Msg ");
_LOG_TRACE(msg);
int size = msg->state_size();
int index = 0;
while (index < size) {
SyncState state = msg->state(index);
- string devStr = state.name();
- Name deviceName((const unsigned char*)devStr.c_str(), devStr.size());
- // cout << "Got Name: " << deviceName;
+ std::string devStr = state.name();
+ Name deviceName(Block((const unsigned char*)devStr.c_str(), devStr.size()));
if (state.type() == SyncState::UPDATE) {
sqlite3_int64 seqno = state.seq();
m_log->UpdateDeviceSeqNo(deviceName, seqno);
if (state.has_locator()) {
- string locStr = state.locator();
- Name locatorName((const unsigned char*)locStr.c_str(), locStr.size());
- // cout << ", Got loc: " << locatorName << endl;
+ std::string locStr = state.locator();
+ Name locatorName(Block((const unsigned char*)locStr.c_str(), locStr.size()));
m_log->UpdateLocator(deviceName, locatorName);
_LOG_TRACE("self: " << m_log->GetLocalName() << ", device: " << deviceName << " < == > "
@@ -284,10 +379,10 @@
}
// find the actuall difference and invoke callback on the actual difference
- HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ ConstBufferPtr oldDigest = m_rootDigest;
+ m_rootDigest = m_log->RememberStateInStateLog();
// get diff with both new SeqNo and old SeqNo
- SyncStateMsgPtr diff = m_log->FindStateDifferences(*oldHash, *m_rootHash, true);
+ SyncStateMsgPtr diff = m_log->FindStateDifferences(*oldDigest, *m_rootDigest, true);
if (diff->state_size() > 0) {
m_stateMsgCallback(diff);
@@ -295,53 +390,6 @@
}
void
-SyncCore::sendSyncInterest()
-{
- Name syncInterest = Name(m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
-
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> SYNC Interest for " << m_rootHash->shortHash()
- << ": "
- << syncInterest);
-
- Selectors selectors;
- if (m_syncInterestInterval > 0 && m_syncInterestInterval < 30.0) {
- selectors.interestLifetime(m_syncInterestInterval);
- }
- m_ccnx->sendInterest(syncInterest,
- Closure(boost::bind(&SyncCore::handleSyncData, this, _1, _2),
- boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1, _2, _3)),
- selectors);
-
- // if there is a pending syncSyncInterest task, reschedule it to be m_syncInterestInterval seconds from now
- // if no such task exists, it will be added
- m_scheduler->rescheduleTask(m_sendSyncInterestTask);
-}
-
-void
-SyncCore::recover(HashPtr hash)
-{
- if (!(*hash == *m_rootHash) && m_log->LookupSyncLog(*hash) <= 0) {
- _LOG_TRACE(m_log->GetLocalName() << ", Recover for: " << hash->shortHash());
- // unfortunately we still don't recognize this hash
- Bytes bytes;
- readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
-
- // append the unknown hash
- Name recoverInterest = Name(m_syncPrefix)(RECOVER)(bytes);
-
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> RECOVER Interests for " << hash->shortHash());
-
- m_ccnx->sendInterest(recoverInterest,
- Closure(boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
- boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1, _2,
- _3)));
- }
- else {
- // we already learned the hash; cheers!
- }
-}
-
-void
SyncCore::deregister(const Name& name)
{
// Do nothing for now
@@ -353,3 +401,6 @@
{
return m_log->SeqNo(name);
}
+
+} // namespace chronoshare
+} // namespace ndn