sync: Switch code to use ndn-cxx
Thic commit also moves code to ndn::chronoshare namespace
Change-Id: I6eae8cab53fd68faa0e9523d166dbb60d1b59a95
diff --git a/src/sync-core.cpp b/src/sync-core.cpp
index c6bf498..c4f9a26 100644
--- a/src/sync-core.cpp
+++ b/src/sync-core.cpp
@@ -19,63 +19,69 @@
*/
#include "sync-core.hpp"
-#include "logging.hpp"
-#include "periodic-task.hpp"
-#include "random-interval-generator.hpp"
-#include "simple-interval-generator.hpp"
#include "sync-state-helper.hpp"
+#include "core/logging.hpp"
+
+#include <ndn-cxx/util/string-helper.hpp>
#include <boost/lexical_cast.hpp>
-#include <boost/make_shared.hpp>
+
+namespace ndn {
+namespace chronoshare {
INIT_LOGGER("Sync.Core")
-const string SyncCore::RECOVER = "RECOVER";
+const int SyncCore::FRESHNESS = 2;
+const std::string SyncCore::RECOVER = "RECOVER";
const double SyncCore::WAIT = 0.05;
const double SyncCore::RANDOM_PERCENT = 0.5;
-const std::string SYNC_INTEREST_TAG = "send-sync-interest";
-const std::string SYNC_INTEREST_TAG2 = "send-sync-interest2";
-
-const std::string LOCAL_STATE_CHANGE_DELAYED_TAG = "local-state-changed";
-
-using namespace boost;
-using namespace Ndnx;
-
-SyncCore::SyncCore(SyncLogPtr syncLog, const Name& userName, const Name& localPrefix,
- const Name& syncPrefix, const StateMsgCallback& callback, CcnxWrapperPtr ccnx,
- double syncInterestInterval /*= -1.0*/)
- : m_ccnx(ccnx)
+SyncCore::SyncCore(Face& face, SyncLogPtr syncLog, const Name& userName, const Name& localPrefix,
+ const Name& syncPrefix, const StateMsgCallback& callback,
+ long syncInterestInterval /*= -1.0*/)
+ : m_face(face)
, m_log(syncLog)
- , m_scheduler(new Scheduler())
+ , m_scheduler(m_face.getIoService())
+ , m_syncInterestEvent(m_scheduler)
+ , m_periodicInterestEvent(m_scheduler)
+ , m_localStateDelayedEvent(m_scheduler)
, m_stateMsgCallback(callback)
, m_syncPrefix(syncPrefix)
, m_recoverWaitGenerator(
- new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
+ new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::Direction::UP))
, m_syncInterestInterval(syncInterestInterval)
{
- m_rootHash = m_log->RememberStateInStateLog();
+ m_rootDigest = m_log->RememberStateInStateLog();
- m_ndnx->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
- // m_log->initYP(m_yp);
+ m_registeredPrefixId =
+ m_face.setInterestFilter(m_syncPrefix, bind(&SyncCore::handleInterest, this, _1, _2),
+ RegisterPrefixSuccessCallback(),
+ bind(&SyncCore::onRegisterFailed, this, _1, _2));
+
m_log->UpdateLocalLocator(localPrefix);
- m_scheduler->start();
+ time::seconds interval = time::seconds(
+ (m_syncInterestInterval > 0 && m_syncInterestInterval < 30) ? m_syncInterestInterval : 4);
- double interval =
- (m_syncInterestInterval > 0 && m_syncInterestInterval < 30.0) ? m_syncInterestInterval : 4.0;
- m_sendSyncInterestTask =
- make_shared<PeriodicTask>(bind(&SyncCore::sendSyncInterest, this), SYNC_INTEREST_TAG,
- m_scheduler, make_shared<SimpleIntervalGenerator>(interval));
- // sendSyncInterest();
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.1, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ m_periodicInterestEvent =
+ m_scheduler.scheduleEvent(interval, bind(&SyncCore::sendPeriodicSyncInterest, this, interval));
+
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(100), bind(&SyncCore::sendSyncInterest, this));
+}
+
+void
+SyncCore::sendPeriodicSyncInterest(const time::seconds& interval)
+{
+ sendSyncInterest();
+ m_periodicInterestEvent =
+ m_scheduler.scheduleEvent(interval, bind(&SyncCore::sendPeriodicSyncInterest, this, interval));
}
SyncCore::~SyncCore()
{
- m_scheduler->shutdown();
// need to "deregister" closures
+ m_face.unsetInterestFilter(m_registeredPrefixId);
}
void
@@ -88,25 +94,40 @@
void
SyncCore::localStateChanged()
{
- HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ ConstBufferPtr oldDigest = m_rootDigest;
+ m_rootDigest = m_log->RememberStateInStateLog();
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldHash, *m_rootHash);
-
- // reply sync Interest with oldHash as last component
- Name syncName = Name(m_syncPrefix)(oldHash->GetHash(), oldHash->GetHashBytes());
- BytesPtr syncData = serializeGZipMsg(*msg);
-
- m_ccnx->publishData(syncName, *syncData, FRESHNESS);
_LOG_DEBUG("[" << m_log->GetLocalName() << "] localStateChanged ");
- _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes: " << oldHash->shortHash());
- // _LOG_TRACE (msg);
+ _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes: oldDigest--" << toHex(*oldDigest)
+ << " newDigest--"
+ << toHex(*m_rootDigest));
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
- // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.05, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldDigest, *m_rootDigest);
+
+ // reply sync Interest with oldDigest as last component
+
+ Name syncName(m_syncPrefix);
+ syncName.appendImplicitSha256Digest(oldDigest);
+
+ BufferPtr syncData = serializeGZipMsg(*msg);
+
+ // Create Data packet
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(syncName);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE(msg);
+
+ // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no
+ // problem, we know the new root digest already;
+ // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets
+ // reversed at receivers
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(50), bind(&SyncCore::sendSyncInterest, this));
+
// sendSyncInterest();
}
@@ -114,90 +135,166 @@
void
SyncCore::localStateChangedDelayed()
{
- // many calls to localStateChangedDelayed within 0.5 second will be suppressed to one localStateChanged calls
- Scheduler::scheduleOneTimeTask(m_scheduler, 0.5, bind(&SyncCore::localStateChanged, this),
- LOCAL_STATE_CHANGE_DELAYED_TAG);
+ // many calls to localStateChangedDelayed within 0.5 second will be suppressed to one
+ // localStateChanged calls
+ m_localStateDelayedEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(500), bind(&SyncCore::localStateChanged, this));
+}
+
+// ------------------------------------------------------------------------------------ send &
+// handle interest
+
+void
+SyncCore::sendSyncInterest()
+{
+ Name syncInterest(m_syncPrefix);
+ // syncInterest.append(name::Component(*m_rootDigest));
+ syncInterest.appendImplicitSha256Digest(m_rootDigest);
+
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> send SYNC Interest for " << toHex(*m_rootDigest)
+ << ": "
+ << syncInterest);
+
+ Interest interest(syncInterest);
+ if (m_syncInterestInterval > 0 && m_syncInterestInterval < 30) {
+ interest.setInterestLifetime(time::seconds(m_syncInterestInterval));
+ }
+
+ m_face.expressInterest(interest, bind(&SyncCore::handleSyncData, this, _1, _2),
+ bind(&SyncCore::handleSyncInterestTimeout, this, _1));
+
+ // // if there is a pending syncSyncInterest task, reschedule it to be m_syncInterestInterval seconds
+ // // from now
+ // // if no such task exists, it will be added
+ // m_scheduler->rescheduleTask(m_sendSyncInterestTask);
}
void
-SyncCore::handleInterest(const Name& name)
+SyncCore::recover(ConstBufferPtr digest)
{
+ if (!(*digest == *m_rootDigest) && m_log->LookupSyncLog(*digest) <= 0) {
+ _LOG_TRACE(m_log->GetLocalName() << ", Recover for received_Digest " << toHex(*digest));
+ // unfortunately we still don't recognize this digest
+ // append the unknown digest
+ Name recoverInterest(m_syncPrefix);
+ recoverInterest.append(RECOVER).appendImplicitSha256Digest(digest);
+
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> send RECOVER Interests for " << toHex(*digest));
+
+ m_face.expressInterest(recoverInterest,
+ bind(&SyncCore::handleRecoverData, this, _1, _2),
+ bind(&SyncCore::handleRecoverInterestTimeout, this, _1));
+ }
+ else {
+ // we already learned the digest; cheers!
+ }
+}
+
+void
+SyncCore::handleInterest(const InterestFilter& filter, const Interest& interest)
+{
+ Name name = interest.getName();
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<< handleInterest with Name: " << name);
int size = name.size();
int prefixSize = m_syncPrefix.size();
if (size == prefixSize + 1) {
// this is normal sync interest
handleSyncInterest(name);
}
- else if (size == prefixSize + 2 && name.getCompAsString(m_syncPrefix.size()) == RECOVER) {
+ else if (size == prefixSize + 2 && name.get(m_syncPrefix.size()).toUri() == RECOVER) {
// this is recovery interest
handleRecoverInterest(name);
}
}
void
-SyncCore::handleRecoverInterest(const Name& name)
-{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< RECOVER Interest with name " << name);
-
- Bytes hashBytes = name.getComp(name.size() - 1);
- // this is the hash unkonwn to the sender of the interest
- Hash hash(head(hashBytes), hashBytes.size());
- if (m_log->LookupSyncLog(hash) > 0) {
- // we know the hash, should reply everything
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*(Hash::Origin), *m_rootHash);
-
- BytesPtr syncData = serializeGZipMsg(*msg);
- m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes " << hash.shortHash());
- // _LOG_TRACE (msg);
- }
- else {
- // we don't recognize this hash, can not help
- }
-}
-
-void
SyncCore::handleSyncInterest(const Name& name)
{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< SYNC Interest with name " << name);
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< handle SYNC Interest with Name: " << name);
- Bytes hashBytes = name.getComp(name.size() - 1);
- HashPtr hash(new Hash(head(hashBytes), hashBytes.size()));
- if (*hash == *m_rootHash) {
- // we have the same hash; nothing needs to be done
- _LOG_TRACE("same as root hash: " << hash->shortHash());
+ ConstBufferPtr digest = make_shared<Buffer>(name.get(-1).value(), name.get(-1).value_size());
+ if (*digest == *m_rootDigest) {
+ // we have the same digest; nothing needs to be done
+ _LOG_TRACE("same as root digest: " << toHex(*digest));
return;
}
- else if (m_log->LookupSyncLog(*hash) > 0) {
+ else if (m_log->LookupSyncLog(*digest) > 0) {
// we know something more
- _LOG_TRACE("found hash in sync log");
- SyncStateMsgPtr msg = m_log->FindStateDifferences(*hash, *m_rootHash);
+ _LOG_TRACE("found digest in sync log");
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*digest, *m_rootDigest);
- BytesPtr syncData = serializeGZipMsg(*msg);
- m_ccnx->publishData(name, *syncData, FRESHNESS);
- _LOG_TRACE(m_log->GetLocalName() << " publishes: " << hash->shortHash());
+ BufferPtr syncData = serializeGZipMsg(*msg);
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(name);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE(m_log->GetLocalName() << " publishes: " << toHex(*digest) << " my_rootDigest:"
+ << toHex(*m_rootDigest));
_LOG_TRACE(msg);
}
else {
- // we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
+ // we don't recognize the digest, send recover Interest if still don't know the digest after a
+ // randomized wait period
double wait = m_recoverWaitGenerator->nextInterval();
- _LOG_TRACE(m_log->GetLocalName() << ", rootHash: " << *m_rootHash
- << ", hash: " << hash->shortHash());
+ _LOG_TRACE(m_log->GetLocalName() << ", my_rootDigest: " << toHex(*m_rootDigest)
+ << ", received_Digest: "
+ << toHex(*digest));
_LOG_TRACE("recover task scheduled after wait: " << wait);
- Scheduler::scheduleOneTimeTask(m_scheduler, wait, boost::bind(&SyncCore::recover, this, hash),
- "r-" + lexical_cast<string>(*hash));
+ // @todo Figure out how to cancel scheduled events when class is destroyed
+ m_scheduler.scheduleEvent(time::milliseconds(static_cast<int>(wait * 1000)),
+ bind(&SyncCore::recover, this, digest));
}
}
void
-SyncCore::handleSyncInterestTimeout(const Name& name, const Closure& closure, Selectors selectors)
+SyncCore::handleRecoverInterest(const Name& name)
+{
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< handle RECOVER Interest with name " << name);
+
+ ConstBufferPtr digest = make_shared<Buffer>(name.get(-1).value(), name.get(-1).value_size());
+ // this is the digest unkonwn to the sender of the interest
+ _LOG_DEBUG("rootDigest: " << toHex(*digest));
+ if (m_log->LookupSyncLog(*digest) > 0) {
+ _LOG_DEBUG("Find in our sync_log! " << toHex(*digest));
+ // we know the digest, should reply everything and the newest thing, but not the digest!!! This
+ // is important
+ unsigned char _origin = 0;
+ BufferPtr origin = make_shared<Buffer>(_origin);
+ // std::cout << "size of origin " << origin->size() << std::endl;
+ SyncStateMsgPtr msg = m_log->FindStateDifferences(*origin, *m_rootDigest);
+
+ BufferPtr syncData = serializeGZipMsg(*msg);
+ shared_ptr<Data> data = make_shared<Data>();
+ data->setName(name);
+ data->setFreshnessPeriod(time::seconds(FRESHNESS));
+ data->setContent(reinterpret_cast<const uint8_t*>(syncData->buf()), syncData->size());
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ _LOG_TRACE("[" << m_log->GetLocalName() << "] publishes " << toHex(*digest)
+ << " FindStateDifferences(0, m_rootDigest/"
+ << toHex(*m_rootDigest)
+ << ")");
+ _LOG_TRACE(msg);
+ }
+ else {
+ // we don't recognize this digest, can not help
+ _LOG_DEBUG("we don't recognize this digest, can not help");
+ }
+}
+
+void
+SyncCore::handleSyncInterestTimeout(const Interest& interest)
{
// sync interest will be resent by scheduler
}
void
-SyncCore::handleRecoverInterestTimeout(const Name& name, const Closure& closure, Selectors selectors)
+SyncCore::handleRecoverInterestTimeout(const Interest& interest)
{
// We do not re-express recovery interest for now
// if difference is not resolved, the sync interest will trigger
@@ -206,46 +303,45 @@
}
void
-SyncCore::handleRecoverData(const Name& name, PcoPtr content)
+SyncCore::handleSyncData(const Interest& interest, Data& data)
{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< RECOVER DATA with name: " << name);
- //cout << "handle recover data" << end;
- if (content && content->contentPtr() && content->contentPtr()->size() > 0) {
- handleStateData(*content->contentPtr());
- }
- else {
- _LOG_ERROR("Got recovery DATA with empty content");
- }
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< receive SYNC DATA with interest: "
+ << interest.toUri());
- // sendSyncInterest();
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- Scheduler::scheduleOneTimeTask(m_scheduler, 0, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
-}
-
-void
-SyncCore::handleSyncData(const Name& name, PcoPtr content)
-{
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< SYNC DATA with name: " << name);
-
+ const Block& content = data.getContent();
// suppress recover in interest - data out of order case
- if (content && content->contentPtr() && content->contentPtr()->size() > 0) {
- handleStateData(*content->contentPtr());
+ if (data.getContent().value() && content.size() > 0) {
+ handleStateData(Buffer(content.value(), content.value_size()));
}
else {
_LOG_ERROR("Got sync DATA with empty content");
}
// resume outstanding sync interest
- // sendSyncInterest();
-
- m_scheduler->deleteTask(SYNC_INTEREST_TAG2);
- Scheduler::scheduleOneTimeTask(m_scheduler, 0, bind(&SyncCore::sendSyncInterest, this),
- SYNC_INTEREST_TAG2);
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(0), bind(&SyncCore::sendSyncInterest, this));
}
void
-SyncCore::handleStateData(const Bytes& content)
+SyncCore::handleRecoverData(const Interest& interest, Data& data)
+{
+ _LOG_DEBUG("[" << m_log->GetLocalName() << "] <<<<< receive RECOVER DATA with interest: "
+ << interest.toUri());
+ // cout << "handle recover data" << end;
+ const Block& content = data.getContent();
+ if (content.value() && content.size() > 0) {
+ handleStateData(Buffer(content.value(), content.value_size()));
+ }
+ else {
+ _LOG_ERROR("Got recovery DATA with empty content");
+ }
+
+ m_syncInterestEvent =
+ m_scheduler.scheduleEvent(time::milliseconds(0), bind(&SyncCore::sendSyncInterest, this));
+}
+
+void
+SyncCore::handleStateData(const Buffer& content)
{
SyncStateMsgPtr msg = deserializeGZipMsg<SyncStateMsg>(content);
if (!(msg)) {
@@ -254,22 +350,21 @@
return;
}
- _LOG_TRACE(m_log->GetLocalName() << " receives Msg ");
+ _LOG_TRACE("[" << m_log->GetLocalName() << "]"
+ << " receives Msg ");
_LOG_TRACE(msg);
int size = msg->state_size();
int index = 0;
while (index < size) {
SyncState state = msg->state(index);
- string devStr = state.name();
- Name deviceName((const unsigned char*)devStr.c_str(), devStr.size());
- // cout << "Got Name: " << deviceName;
+ std::string devStr = state.name();
+ Name deviceName(Block((const unsigned char*)devStr.c_str(), devStr.size()));
if (state.type() == SyncState::UPDATE) {
sqlite3_int64 seqno = state.seq();
m_log->UpdateDeviceSeqNo(deviceName, seqno);
if (state.has_locator()) {
- string locStr = state.locator();
- Name locatorName((const unsigned char*)locStr.c_str(), locStr.size());
- // cout << ", Got loc: " << locatorName << endl;
+ std::string locStr = state.locator();
+ Name locatorName(Block((const unsigned char*)locStr.c_str(), locStr.size()));
m_log->UpdateLocator(deviceName, locatorName);
_LOG_TRACE("self: " << m_log->GetLocalName() << ", device: " << deviceName << " < == > "
@@ -284,10 +379,10 @@
}
// find the actuall difference and invoke callback on the actual difference
- HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ ConstBufferPtr oldDigest = m_rootDigest;
+ m_rootDigest = m_log->RememberStateInStateLog();
// get diff with both new SeqNo and old SeqNo
- SyncStateMsgPtr diff = m_log->FindStateDifferences(*oldHash, *m_rootHash, true);
+ SyncStateMsgPtr diff = m_log->FindStateDifferences(*oldDigest, *m_rootDigest, true);
if (diff->state_size() > 0) {
m_stateMsgCallback(diff);
@@ -295,53 +390,6 @@
}
void
-SyncCore::sendSyncInterest()
-{
- Name syncInterest = Name(m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
-
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> SYNC Interest for " << m_rootHash->shortHash()
- << ": "
- << syncInterest);
-
- Selectors selectors;
- if (m_syncInterestInterval > 0 && m_syncInterestInterval < 30.0) {
- selectors.interestLifetime(m_syncInterestInterval);
- }
- m_ccnx->sendInterest(syncInterest,
- Closure(boost::bind(&SyncCore::handleSyncData, this, _1, _2),
- boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1, _2, _3)),
- selectors);
-
- // if there is a pending syncSyncInterest task, reschedule it to be m_syncInterestInterval seconds from now
- // if no such task exists, it will be added
- m_scheduler->rescheduleTask(m_sendSyncInterestTask);
-}
-
-void
-SyncCore::recover(HashPtr hash)
-{
- if (!(*hash == *m_rootHash) && m_log->LookupSyncLog(*hash) <= 0) {
- _LOG_TRACE(m_log->GetLocalName() << ", Recover for: " << hash->shortHash());
- // unfortunately we still don't recognize this hash
- Bytes bytes;
- readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
-
- // append the unknown hash
- Name recoverInterest = Name(m_syncPrefix)(RECOVER)(bytes);
-
- _LOG_DEBUG("[" << m_log->GetLocalName() << "] >>> RECOVER Interests for " << hash->shortHash());
-
- m_ccnx->sendInterest(recoverInterest,
- Closure(boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
- boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1, _2,
- _3)));
- }
- else {
- // we already learned the hash; cheers!
- }
-}
-
-void
SyncCore::deregister(const Name& name)
{
// Do nothing for now
@@ -353,3 +401,6 @@
{
return m_log->SeqNo(name);
}
+
+} // namespace chronoshare
+} // namespace ndn
diff --git a/src/sync-core.hpp b/src/sync-core.hpp
index 7f10717..6679244 100644
--- a/src/sync-core.hpp
+++ b/src/sync-core.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2016, Regents of the University of California.
+ * Copyright (c) 2013-2017, Regents of the University of California.
*
* This file is part of ChronoShare, a decentralized file sharing application over NDN.
*
@@ -21,35 +21,113 @@
#ifndef SYNC_CORE_H
#define SYNC_CORE_H
-#include "ccnx-selectors.hpp"
-#include "ccnx-wrapper.hpp"
-#include "scheduler.hpp"
#include "sync-log.hpp"
-#include "task.hpp"
+#include "core/chronoshare-common.hpp"
+#include "core/random-interval-generator.hpp"
-#include <boost/function.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+
+#include <boost/iostreams/device/back_inserter.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+
+namespace ndn {
+namespace chronoshare {
+
+// No use this now
+template <class Msg>
+BufferPtr
+serializeMsg(const Msg& msg)
+{
+ int size = msg.ByteSize();
+ BufferPtr bytes = std::make_shared<Buffer>(size);
+ msg.SerializeToArray(bytes->buf(), size);
+ return bytes;
+}
+
+template <class Msg>
+shared_ptr<Msg>
+deserializeMsg(const Buffer& bytes)
+{
+ shared_ptr<Msg> retval(new Msg());
+ if (!retval->ParseFromArray(bytes.buf(), bytes.size())) {
+ // to indicate an error
+ return shared_ptr<Msg>();
+ }
+ return retval;
+}
+
+template <class Msg>
+BufferPtr
+serializeGZipMsg(const Msg& msg)
+{
+ std::vector<char> bytes; // Bytes couldn't work
+ {
+ boost::iostreams::filtering_ostream out;
+ out.push(boost::iostreams::gzip_compressor()); // gzip filter
+ out.push(boost::iostreams::back_inserter(bytes)); // back_inserter sink
+
+ msg.SerializeToOstream(&out);
+ }
+ BufferPtr uBytes = std::make_shared<Buffer>(bytes.size());
+ memcpy(&(*uBytes)[0], &bytes[0], bytes.size());
+ return uBytes;
+}
+
+template <class Msg>
+shared_ptr<Msg>
+deserializeGZipMsg(const Buffer& bytes)
+{
+ std::vector<char> sBytes(bytes.size());
+ memcpy(&sBytes[0], &bytes[0], bytes.size());
+ boost::iostreams::filtering_istream in;
+ in.push(boost::iostreams::gzip_decompressor()); // gzip filter
+ in.push(boost::make_iterator_range(sBytes)); // source
+
+ shared_ptr<Msg> retval = make_shared<Msg>();
+ if (!retval->ParseFromIstream(&in)) {
+ // to indicate an error
+ return shared_ptr<Msg>();
+ }
+
+ return retval;
+}
class SyncCore
{
public:
- typedef boost::function<void(SyncStateMsgPtr stateMsg)> StateMsgCallback;
+ typedef function<void(SyncStateMsgPtr stateMsg)> StateMsgCallback;
- static const int FRESHNESS = 2; // seconds
- static const string RECOVER;
+ static const int FRESHNESS; // seconds
+ static const std::string RECOVER;
static const double WAIT; // seconds;
static const double RANDOM_PERCENT; // seconds;
+ class Error : public boost::exception, public std::runtime_error
+ {
+ public:
+ explicit Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
public:
- SyncCore(SyncLogPtr syncLog, const Ccnx::Name& userName,
- const Ccnx::Name& localPrefix // routable name used by the local user
+ SyncCore(Face& face, SyncLogPtr syncLog, const Name& userName,
+ const Name& localPrefix // routable name used by the local user
,
- const Ccnx::Name& syncPrefix // the prefix for the sync collection
+ const Name& syncPrefix // the prefix for the sync collection
,
const StateMsgCallback& callback // callback when state change is detected
,
- Ccnx::CcnxWrapperPtr ccnx, double syncInterestInterval = -1.0);
+ long syncInterestInterval = -1);
~SyncCore();
+ void updateLocalState(sqlite3_int64);
+
void
localStateChanged();
@@ -62,71 +140,86 @@
void
localStateChangedDelayed();
- void updateLocalState(sqlite3_int64);
-
// ------------------ only used in test -------------------------
+
public:
- HashPtr
+ ConstBufferPtr
root() const
{
- return m_rootHash;
+ return m_rootDigest;
}
sqlite3_int64
- seq(const Ccnx::Name& name);
+ seq(const Name& name);
private:
void
- handleInterest(const Ccnx::Name& name);
+ onRegisterFailed(const Name& prefix, const std::string& reason)
+ {
+ std::cerr << "ERROR: Failed to register prefix \"" << prefix << "\" in local hub's daemon ("
+ << reason << ")" << std::endl;
+ throw Error("ERROR: Failed to register prefix (" + reason + ")");
+ }
void
- handleSyncData(const Ccnx::Name& name, Ccnx::PcoPtr content);
-
- void
- handleRecoverData(const Ccnx::Name& name, Ccnx::PcoPtr content);
-
- void
- handleSyncInterestTimeout(const Ccnx::Name& name, const Ccnx::Closure& closure,
- Ccnx::Selectors selectors);
-
- void
- handleRecoverInterestTimeout(const Ccnx::Name& name, const Ccnx::Closure& closure,
- Ccnx::Selectors selectors);
-
- void
- deregister(const Ccnx::Name& name);
-
- void
- recover(HashPtr hash);
-
-private:
- void
sendSyncInterest();
void
- handleSyncInterest(const Ccnx::Name& name);
+ sendPeriodicSyncInterest(const time::seconds& interval);
void
- handleRecoverInterest(const Ccnx::Name& name);
+ recover(ConstBufferPtr digest);
void
- handleStateData(const Ccnx::Bytes& content);
+ handleInterest(const InterestFilter& filter, const Interest& interest);
+
+ void
+ handleSyncInterest(const Name& name);
+
+ void
+ handleRecoverInterest(const Name& name);
+
+ void
+ handleSyncInterestTimeout(const Interest& interest);
+
+ void
+ handleRecoverInterestTimeout(const Interest& interest);
+
+ void
+ handleSyncData(const Interest& interest, Data& data);
+
+ void
+ handleRecoverData(const Interest& interest, Data& data);
+
+ void
+ handleStateData(const Buffer& content);
+
+ void
+ deregister(const Name& name);
private:
- Ndnx::NdnxWrapperPtr m_ndnx;
+ Face& m_face;
SyncLogPtr m_log;
- SchedulerPtr m_scheduler;
+
+ Scheduler m_scheduler;
+ util::scheduler::ScopedEventId m_syncInterestEvent;
+ util::scheduler::ScopedEventId m_periodicInterestEvent;
+ util::scheduler::ScopedEventId m_localStateDelayedEvent;
+
StateMsgCallback m_stateMsgCallback;
- Ndnx::Name m_syncPrefix;
- HashPtr m_rootHash;
+ Name m_syncPrefix;
+ ConstBufferPtr m_rootDigest;
IntervalGeneratorPtr m_recoverWaitGenerator;
- TaskPtr m_sendSyncInterestTask;
-
- double m_syncInterestInterval;
+ long m_syncInterestInterval;
+ KeyChain m_keyChain;
+ const RegisteredPrefixId* m_registeredPrefixId;
};
+} // namespace chronoshare
+} // namespace ndn
+
#endif // SYNC_CORE_H
diff --git a/src/sync-log.cpp b/src/sync-log.cpp
index bb10168..4e5403d 100644
--- a/src/sync-log.cpp
+++ b/src/sync-log.cpp
@@ -19,18 +19,18 @@
*/
#include "sync-log.hpp"
-#include "logging.hpp"
-#include <utility>
+#include "core/logging.hpp"
-#include <boost/make_shared.hpp>
-#include <boost/thread.hpp>
+#include <ndn-cxx/util/sqlite3-statement.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
+
+namespace ndn {
+namespace chronoshare {
+
+using util::Sqlite3Statement;
INIT_LOGGER("Sync.Log")
-using namespace boost;
-using namespace std;
-using namespace Ndnx;
-
// static void
// xTrace(void*, const char* q)
// {
@@ -96,55 +96,47 @@
END; \n\
";
-
-SyncLog::SyncLog(const boost::filesystem::path& path, const Ccnx::Name& localName)
+SyncLog::SyncLog(const boost::filesystem::path& path, const Name& localName)
: DbHelper(path / ".chronoshare", "sync-log.db")
, m_localName(localName)
{
sqlite3_exec(m_db, INIT_DATABASE.c_str(), NULL, NULL, NULL);
- _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_OK, sqlite3_errmsg(m_db));
+ _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_OK, "DB Constructer: " << sqlite3_errmsg(m_db));
UpdateDeviceSeqNo(localName, 0);
- sqlite3_stmt* stmt;
- int res = sqlite3_prepare_v2(m_db, "SELECT device_id, seq_no FROM SyncNodes WHERE device_name=?",
- -1, &stmt, 0);
-
- Ccnx::CcnxCharbufPtr name = m_localName;
- sqlite3_bind_blob(stmt, 1, name->buf(), name->length(), SQLITE_STATIC);
+ Sqlite3Statement stmt(m_db, "SELECT device_id, seq_no FROM SyncNodes WHERE device_name=?");
+ stmt.bind(1, m_localName.wireEncode(), SQLITE_STATIC);
if (sqlite3_step(stmt) == SQLITE_ROW) {
- m_localDeviceId = sqlite3_column_int64(stmt, 0);
+ m_localDeviceId = stmt.getInt(0);
}
else {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Impossible thing in SyncLog::SyncLog"));
+ BOOST_THROW_EXCEPTION(Error("Impossible thing in SyncLog::SyncLog"));
}
- sqlite3_finalize(stmt);
}
sqlite3_int64
SyncLog::GetNextLocalSeqNo()
{
- sqlite3_stmt* stmt_seq;
- sqlite3_prepare_v2(m_db, "SELECT seq_no FROM SyncNodes WHERE device_id = ?", -1, &stmt_seq, 0);
- sqlite3_bind_int64(stmt_seq, 1, m_localDeviceId);
+ Sqlite3Statement stmt_seq(m_db, "SELECT seq_no FROM SyncNodes WHERE device_id = ?");
+ stmt_seq.bind(1, m_localDeviceId);
if (sqlite3_step(stmt_seq) != SQLITE_ROW) {
- BOOST_THROW_EXCEPTION(Error::Db()
- << errmsg_info_str("Impossible thing in SyncLog::GetNextLocalSeqNo"));
+ BOOST_THROW_EXCEPTION(Error("Impossible thing in SyncLog::GetNextLocalSeqNo"));
}
- _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_DONE, sqlite3_errmsg(m_db));
+ _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_DONE,
+ "DB GetNextLocalSeqNo: " << sqlite3_errmsg(m_db));
- sqlite3_int64 seq_no = sqlite3_column_int64(stmt_seq, 0) + 1;
- sqlite3_finalize(stmt_seq);
+ sqlite3_int64 seq_no = stmt_seq.getInt(0) + 1;
UpdateDeviceSeqNo(m_localDeviceId, seq_no);
return seq_no;
}
-HashPtr
+ConstBufferPtr
SyncLog::RememberStateInStateLog()
{
WriteLock lock(m_stateUpdateMutex);
@@ -165,7 +157,7 @@
if (res != SQLITE_OK) {
sqlite3_exec(m_db, "ROLLBACK TRANSACTION;", 0, 0, 0);
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str(sqlite3_errmsg(m_db)));
+ BOOST_THROW_EXCEPTION(Error(sqlite3_errmsg(m_db)));
}
sqlite3_int64 rowId = sqlite3_last_insert_rowid(m_db);
@@ -185,7 +177,7 @@
_LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_DONE, "DbError: " << sqlite3_errmsg(m_db));
if (res != SQLITE_OK) {
sqlite3_exec(m_db, "ROLLBACK TRANSACTION;", 0, 0, 0);
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str(sqlite3_errmsg(m_db)));
+ BOOST_THROW_EXCEPTION(Error(sqlite3_errmsg(m_db)));
}
sqlite3_finalize(insertStmt);
@@ -196,25 +188,24 @@
-1, &getHashStmt, 0);
res += sqlite3_bind_int64(getHashStmt, 1, rowId);
- HashPtr retval;
+ BufferPtr retval;
int stepRes = sqlite3_step(getHashStmt);
if (stepRes == SQLITE_ROW) {
- retval =
- make_shared<Hash>(sqlite3_column_blob(getHashStmt, 0), sqlite3_column_bytes(getHashStmt, 0));
+ retval = make_shared<Buffer>(static_cast<const uint8_t*>(sqlite3_column_blob(getHashStmt, 0)),
+ sqlite3_column_bytes(getHashStmt, 0));
}
else {
sqlite3_exec(m_db, "ROLLBACK TRANSACTION;", 0, 0, 0);
_LOG_ERROR("DbError: " << sqlite3_errmsg(m_db));
- BOOST_THROW_EXCEPTION(Error::Db()
- << errmsg_info_str("Not a valid hash in rememberStateInStateLog"));
+ BOOST_THROW_EXCEPTION(Error("Not a valid hash in rememberStateInStateLog"));
}
sqlite3_finalize(getHashStmt);
res += sqlite3_exec(m_db, "COMMIT;", 0, 0, 0);
if (res != SQLITE_OK) {
sqlite3_exec(m_db, "ROLLBACK TRANSACTION;", 0, 0, 0);
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Some error with rememberStateInStateLog"));
+ BOOST_THROW_EXCEPTION(Error("Some error with rememberStateInStateLog"));
}
return retval;
@@ -223,22 +214,22 @@
sqlite3_int64
SyncLog::LookupSyncLog(const std::string& stateHash)
{
- return LookupSyncLog(*Hash::FromString(stateHash));
+ return LookupSyncLog(*fromHex(stateHash));
}
sqlite3_int64
-SyncLog::LookupSyncLog(const Hash& stateHash)
+SyncLog::LookupSyncLog(const Buffer& stateHash)
{
sqlite3_stmt* stmt;
int res = sqlite3_prepare(m_db, "SELECT state_id FROM SyncLog WHERE state_hash = ?", -1, &stmt, 0);
if (res != SQLITE_OK) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Cannot prepare statement"));
+ BOOST_THROW_EXCEPTION(Error("Cannot prepare statement"));
}
- res = sqlite3_bind_blob(stmt, 1, stateHash.GetHash(), stateHash.GetHashBytes(), SQLITE_STATIC);
+ res = sqlite3_bind_blob(stmt, 1, stateHash.buf(), stateHash.size(), SQLITE_STATIC);
if (res != SQLITE_OK) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Cannot bind"));
+ BOOST_THROW_EXCEPTION(Error("Cannot bind"));
}
sqlite3_int64 row = 0; // something bad
@@ -253,20 +244,20 @@
}
void
-SyncLog::UpdateDeviceSeqNo(const Ccnx::Name& name, sqlite3_int64 seqNo)
+SyncLog::UpdateDeviceSeqNo(const Name& name, sqlite3_int64 seqNo)
{
sqlite3_stmt* stmt;
// update is performed using trigger
int res =
sqlite3_prepare(m_db, "INSERT INTO SyncNodes (device_name, seq_no) VALUES (?,?);", -1, &stmt, 0);
- Ccnx::CcnxCharbufPtr nameBuf = name;
- res += sqlite3_bind_blob(stmt, 1, nameBuf->buf(), nameBuf->length(), SQLITE_STATIC);
+ res +=
+ sqlite3_bind_blob(stmt, 1, name.wireEncode().wire(), name.wireEncode().size(), SQLITE_STATIC);
res += sqlite3_bind_int64(stmt, 2, seqNo);
sqlite3_step(stmt);
if (res != SQLITE_OK) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Some error with UpdateDeviceSeqNo (name)"));
+ BOOST_THROW_EXCEPTION(Error("Some error with UpdateDeviceSeqNo(name)"));
}
sqlite3_finalize(stmt);
}
@@ -290,10 +281,11 @@
sqlite3_step(stmt);
if (res != SQLITE_OK) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Some error with UpdateDeviceSeqNo (id)"));
+ BOOST_THROW_EXCEPTION(Error("Some error with UpdateDeviceSeqNo(id)"));
}
- _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_OK, sqlite3_errmsg(m_db));
+ _LOG_DEBUG_COND(sqlite3_errcode(m_db) != SQLITE_OK,
+ "DB UpdateDeviceSeqNo: " << sqlite3_errmsg(m_db));
sqlite3_finalize(stmt);
}
@@ -304,19 +296,18 @@
sqlite3_stmt* stmt;
sqlite3_prepare_v2(m_db, "SELECT last_known_locator FROM SyncNodes WHERE device_name=?;", -1,
&stmt, 0);
- Ccnx::CcnxCharbufPtr nameBuf = deviceName;
- sqlite3_bind_blob(stmt, 1, nameBuf->buf(), nameBuf->length(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 1, deviceName.wireEncode().wire(), deviceName.wireEncode().size(),
+ SQLITE_STATIC);
int res = sqlite3_step(stmt);
Name locator;
switch (res) {
case SQLITE_ROW: {
- locator =
- Name((const unsigned char*)sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0));
+ locator = Name(Block(sqlite3_column_blob(stmt, 0), sqlite3_column_bytes(stmt, 0)));
}
case SQLITE_DONE:
break;
default:
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Error in LookupLocator()"));
+ BOOST_THROW_EXCEPTION(Error("Error in LookupLocator()"));
}
sqlite3_finalize(stmt);
@@ -324,7 +315,7 @@
return locator;
}
-Ccnx::Name
+Name
SyncLog::LookupLocalLocator()
{
return LookupLocator(m_localName);
@@ -334,24 +325,25 @@
SyncLog::UpdateLocator(const Name& deviceName, const Name& locator)
{
sqlite3_stmt* stmt;
- sqlite3_prepare_v2(m_db,
- "UPDATE SyncNodes SET last_known_locator=?,last_update=datetime('now') WHERE device_name=?;",
+ sqlite3_prepare_v2(m_db, "UPDATE SyncNodes SET last_known_locator=?,last_update=datetime('now', "
+ "'localtime') WHERE device_name=?;",
-1, &stmt, 0);
- Ccnx::CcnxCharbufPtr nameBuf = deviceName;
- Ccnx::CcnxCharbufPtr locatorBuf = locator;
- sqlite3_bind_blob(stmt, 1, locatorBuf->buf(), locatorBuf->length(), SQLITE_STATIC);
- sqlite3_bind_blob(stmt, 2, nameBuf->buf(), nameBuf->length(), SQLITE_STATIC);
+
+ sqlite3_bind_blob(stmt, 1, locator.wireEncode().wire(), locator.wireEncode().size(), SQLITE_STATIC);
+ sqlite3_bind_blob(stmt, 2, deviceName.wireEncode().wire(), deviceName.wireEncode().size(),
+ SQLITE_STATIC);
+
int res = sqlite3_step(stmt);
if (res != SQLITE_OK && res != SQLITE_DONE) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Error in UpdateLoactor()"));
+ BOOST_THROW_EXCEPTION(Error("Error in UpdateLoactor()"));
}
sqlite3_finalize(stmt);
}
void
-SyncLog::UpdateLocalLocator(const Ccnx::Name& forwardingHint)
+SyncLog::UpdateLocalLocator(const Name& forwardingHint)
{
return UpdateLocator(m_localName, forwardingHint);
}
@@ -360,11 +352,11 @@
SyncLog::FindStateDifferences(const std::string& oldHash, const std::string& newHash,
bool includeOldSeq)
{
- return FindStateDifferences(*Hash::FromString(oldHash), *Hash::FromString(newHash), includeOldSeq);
+ return FindStateDifferences(*fromHex(oldHash), *fromHex(newHash), includeOldSeq);
}
SyncStateMsgPtr
-SyncLog::FindStateDifferences(const Hash& oldHash, const Hash& newHash, bool includeOldSeq)
+SyncLog::FindStateDifferences(const Buffer& oldHash, const Buffer& newHash, bool includeOldSeq)
{
sqlite3_stmt* stmt;
@@ -409,11 +401,11 @@
-1, &stmt, 0);
if (res != SQLITE_OK) {
- BOOST_THROW_EXCEPTION(Error::Db() << errmsg_info_str("Some error with FindStateDifferences"));
+ BOOST_THROW_EXCEPTION(Error("Some error with FindStateDifferences"));
}
- res += sqlite3_bind_blob(stmt, 1, oldHash.GetHash(), oldHash.GetHashBytes(), SQLITE_STATIC);
- res += sqlite3_bind_blob(stmt, 2, newHash.GetHash(), newHash.GetHashBytes(), SQLITE_STATIC);
+ res += sqlite3_bind_blob(stmt, 1, oldHash.buf(), oldHash.size(), SQLITE_STATIC);
+ res += sqlite3_bind_blob(stmt, 2, newHash.buf(), newHash.size(), SQLITE_STATIC);
SyncStateMsgPtr msg = make_shared<SyncStateMsg>();
@@ -473,8 +465,9 @@
sqlite3_stmt* stmt;
sqlite3_int64 seq = -1;
sqlite3_prepare_v2(m_db, "SELECT seq_no FROM SyncNodes WHERE device_name=?;", -1, &stmt, 0);
- Ccnx::CcnxCharbufPtr nameBuf = name;
- sqlite3_bind_blob(stmt, 1, nameBuf->buf(), nameBuf->length(), SQLITE_STATIC);
+
+ sqlite3_bind_blob(stmt, 1, name.wireEncode().wire(), name.wireEncode().size(), SQLITE_STATIC);
+
if (sqlite3_step(stmt) == SQLITE_ROW) {
seq = sqlite3_column_int64(stmt, 0);
}
@@ -495,3 +488,6 @@
return retval;
}
+
+} // namespace chronoshare
+} // namespace ndn
diff --git a/src/sync-log.hpp b/src/sync-log.hpp
index e757a0c..65d3674 100644
--- a/src/sync-log.hpp
+++ b/src/sync-log.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2016, Regents of the University of California.
+ * Copyright (c) 2013-2017, Regents of the University of California.
*
* This file is part of ChronoShare, a decentralized file sharing application over NDN.
*
@@ -18,26 +18,45 @@
* See AUTHORS.md for complete list of ChronoShare authors and contributors.
*/
-#ifndef SYNC_LOG_H
-#define SYNC_LOG_H
+#ifndef CHRONOSHARE_SRC_SYNC_LOG_HPP
+#define CHRONOSHARE_SRC_SYNC_LOG_HPP
#include "db-helper.hpp"
-#include <boost/thread/shared_mutex.hpp>
-#include <ccnx-name.h>
-#include <map>
-#include <sync-state.pb.hpp>
+#include "sync-state.pb.h"
+#include "core/chronoshare-common.hpp"
-typedef boost::shared_ptr<SyncStateMsg> SyncStateMsgPtr;
+#include <ndn-cxx/name.hpp>
+
+#include <map>
+
+// @todo Replace with std::thread
+#include <boost/thread.hpp>
+#include <boost/thread/shared_mutex.hpp>
+
+namespace ndn {
+namespace chronoshare {
+
+typedef shared_ptr<SyncStateMsg> SyncStateMsgPtr;
class SyncLog : public DbHelper
{
public:
- SyncLog(const boost::filesystem::path& path, const Ccnx::Name& localName);
+ class Error : public DbHelper::Error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : DbHelper::Error(what)
+ {
+ }
+ };
+
+ SyncLog(const boost::filesystem::path& path, const Name& localName);
/**
* @brief Get local username
*/
- inline const Ccnx::Name&
+ const Name&
GetLocalName() const;
sqlite3_int64
@@ -45,28 +64,28 @@
// done
void
- UpdateDeviceSeqNo(const Ccnx::Name& name, sqlite3_int64 seqNo);
+ UpdateDeviceSeqNo(const Name& name, sqlite3_int64 seqNo);
void
UpdateLocalSeqNo(sqlite3_int64 seqNo);
- Ccnx::Name
- LookupLocator(const Ccnx::Name& deviceName);
+ Name
+ LookupLocator(const Name& deviceName);
- Ccnx::Name
+ Name
LookupLocalLocator();
void
- UpdateLocator(const Ccnx::Name& deviceName, const Ccnx::Name& locator);
+ UpdateLocator(const Name& deviceName, const Name& locator);
void
- UpdateLocalLocator(const Ccnx::Name& locator);
+ UpdateLocalLocator(const Name& locator);
// done
/**
- * Create an entry in SyncLog and SyncStateNodes corresponding to the current state of SyncNodes
+ * Create an 1ntry in SyncLog and SyncStateNodes corresponding to the current state of SyncNodes
*/
- HashPtr
+ ConstBufferPtr
RememberStateInStateLog();
// done
@@ -75,7 +94,7 @@
// done
sqlite3_int64
- LookupSyncLog(const Hash& stateHash);
+ LookupSyncLog(const Buffer& stateHash);
// How difference is exposed will be determined later by the actual protocol
SyncStateMsgPtr
@@ -83,11 +102,11 @@
bool includeOldSeq = false);
SyncStateMsgPtr
- FindStateDifferences(const Hash& oldHash, const Hash& newHash, bool includeOldSeq = false);
+ FindStateDifferences(const Buffer& oldHash, const Buffer& newHash, bool includeOldSeq = false);
//-------- only used in test -----------------
sqlite3_int64
- SeqNo(const Ccnx::Name& name);
+ SeqNo(const Name& name);
sqlite3_int64
LogSize();
@@ -97,7 +116,7 @@
UpdateDeviceSeqNo(sqlite3_int64 deviceId, sqlite3_int64 seqNo);
protected:
- Ndnx::Name m_localName;
+ Name m_localName;
sqlite3_int64 m_localDeviceId;
@@ -107,12 +126,15 @@
Mutex m_stateUpdateMutex;
};
-typedef boost::shared_ptr<SyncLog> SyncLogPtr;
+typedef shared_ptr<SyncLog> SyncLogPtr;
-const Ccnx::Name&
+inline const Name&
SyncLog::GetLocalName() const
{
return m_localName;
}
-#endif // SYNC_LOG_H
+} // namespace chronoshare
+} // namespace ndn
+
+#endif // CHRONOSHARE_SRC_SYNC_LOG_HPP
diff --git a/src/sync-state-helper.hpp b/src/sync-state-helper.hpp
index 7bcf3f4..aa3b366 100644
--- a/src/sync-state-helper.hpp
+++ b/src/sync-state-helper.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2016, Regents of the University of California.
+ * Copyright (c) 2013-2017, Regents of the University of California.
*
* This file is part of ChronoShare, a decentralized file sharing application over NDN.
*
@@ -18,10 +18,15 @@
* See AUTHORS.md for complete list of ChronoShare authors and contributors.
*/
-#ifndef SYNC_STATE_HELPER_H
-#define SYNC_STATE_HELPER_H
+#ifndef CHRONOSHARE_SRC_SYNC_STATE_HELPER_HPP
+#define CHRONOSHARE_SRC_SYNC_STATE_HELPER_HPP
-#include "sync-state.pb.hpp"
+#include "core/chronoshare-common.hpp"
+
+#include "sync-state.pb.h"
+
+namespace ndn {
+namespace chronoshare {
inline std::ostream&
operator<<(std::ostream& os, const SyncStateMsgPtr& msg)
@@ -33,12 +38,12 @@
int index = 0;
while (index < size) {
SyncState state = msg->state(index);
- string strName = state.name();
- string strLocator = state.locator();
+ std::string strName = state.name();
+ std::string strLocator = state.locator();
sqlite3_int64 seq = state.seq();
- os << "Name: " << Ccnx::Name((const unsigned char*)strName.c_str(), strName.size())
- << ", Locator: " << Ccnx::Name((const unsigned char*)strLocator.c_str(), strLocator.size())
+ os << "Name: " << Name(Block((const unsigned char*)strName.c_str(), strName.size())).toUri()
+ << ", Locator: " << Name(Block((const unsigned char*)strLocator.c_str(), strLocator.size()))
<< ", seq: " << seq << std::endl;
index++;
}
@@ -51,5 +56,7 @@
return os;
}
+} // namespace chronoshare
+} // namespace ndn
-#endif // SYNC_STATE_HELPER_H
+#endif // CHRONOSHARE_SRC_SYNC_STATE_HELPER_HPP