Fix logic
Change-Id: I7bfb72e8bb245fab3e9a9d575abc7217bbae86d0
diff --git a/src/logger.hpp b/src/logger.hpp
index 64a8563..c4eee03 100644
--- a/src/logger.hpp
+++ b/src/logger.hpp
@@ -64,7 +64,7 @@
#define _LOG_DEBUG(x) \
std::clog << boost::get_system_time() << " " << boost::this_thread::get_id() << \
- " " << x << std::endl;
+ " " << x << std::endl
#else // _DEBUG
diff --git a/src/logic.cpp b/src/logic.cpp
new file mode 100644
index 0000000..f584da5
--- /dev/null
+++ b/src/logic.cpp
@@ -0,0 +1,593 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2014 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
+ * @author Chaoyi Bian <bcy@pku.edu.cn>
+ * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
+ * @author Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "logic.hpp"
+#include "logger.hpp"
+
+INIT_LOGGER("Logic");
+
+#ifdef _DEBUG
+#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
+#else
+#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
+#endif
+
+namespace chronosync {
+
+using ndn::ConstBufferPtr;
+using ndn::EventId;
+
+const uint8_t EMPTY_DIGEST_VALUE[] = {
+ 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
+ 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
+ 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
+ 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
+};
+
+#ifdef _DEBUG
+int Logic::m_instanceCounter = 0;
+#endif
+
+const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
+const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
+const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
+const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
+const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
+
+const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
+const ndn::name::Component Logic::RESET_COMPONENT("reset");
+
+Logic::Logic(ndn::Face& face,
+ const Name& syncPrefix,
+ const Name& userPrefix,
+ const UpdateCallback& onUpdate,
+ const time::steady_clock::Duration& resetTimer,
+ const time::steady_clock::Duration& cancelResetTimer,
+ const time::milliseconds& resetInterestLifetime,
+ const time::milliseconds& syncInterestLifetime,
+ const time::milliseconds& syncReplyFreshness)
+ : m_face(face)
+ , m_syncPrefix(syncPrefix)
+ , m_userPrefix(userPrefix)
+ , m_interestTable(m_face.getIoService())
+ , m_outstandingInterestId(0)
+ , m_isInReset(false)
+ , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
+ , m_onUpdate(onUpdate)
+ , m_scheduler(m_face.getIoService())
+ , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
+ , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
+ , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
+ , m_resetTimer(resetTimer)
+ , m_cancelResetTimer(cancelResetTimer)
+ , m_resetInterestLifetime(resetInterestLifetime)
+ , m_syncInterestLifetime(syncInterestLifetime)
+ , m_syncReplyFreshness(syncReplyFreshness)
+{
+#ifdef _DEBUG
+ m_instanceId = m_instanceCounter++;
+#endif
+
+ _LOG_DEBUG_ID(">> Logic::Logic");
+
+ m_syncReset = m_syncPrefix;
+ m_syncReset.append("reset");
+
+ _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
+ m_syncRegisteredPrefixId =
+ m_face.setInterestFilter(m_syncPrefix,
+ bind(&Logic::onSyncInterest, this, _1, _2),
+ bind(&Logic::onSyncRegisterFailed, this, _1, _2));
+
+ setUserPrefix(m_userPrefix);
+
+ _LOG_DEBUG_ID("<< Logic::Logic");
+}
+
+Logic::~Logic()
+{
+ m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
+ m_scheduler.cancelAllEvents();
+}
+
+void
+Logic::reset()
+{
+ m_isInReset = true;
+
+ m_state.reset();
+ m_log.clear();
+
+ sendResetInterest();
+
+ // reset outstanding interest name, so that data for previous interest will be dropped.
+ if (m_outstandingInterestId != 0) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ m_outstandingInterestId = 0;
+ }
+
+ sendSyncInterest();
+
+ if (static_cast<bool>(m_delayedInterestProcessingId))
+ m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+ m_delayedInterestProcessingId =
+ m_scheduler.scheduleEvent(m_cancelResetTimer,
+ bind(&Logic::cancelReset, this));
+}
+
+void
+Logic::setUserPrefix(const Name& userPrefix)
+{
+ m_userPrefix = userPrefix;
+
+ m_sessionName = m_userPrefix;
+ m_sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
+
+ m_seqNo = 0;
+
+ reset();
+}
+
+void
+Logic::updateSeqNo(const SeqNo& seqNo)
+{
+ _LOG_DEBUG_ID(">> Logic::updateSeqNo");
+ _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << m_seqNo);
+ if (seqNo < m_seqNo || seqNo == 0)
+ return;
+
+ m_seqNo = seqNo;
+
+ _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << m_seqNo);
+
+ if (!m_isInReset) {
+ _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
+ ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
+ {
+ using namespace CryptoPP;
+
+ std::string hash;
+ StringSource(previousRoot->buf(), previousRoot->size(), true,
+ new HexEncoder(new StringSink(hash), false));
+ _LOG_DEBUG_ID("Hash: " << hash);
+ }
+
+ bool isInserted = false;
+ bool isUpdated = false;
+ SeqNo oldSeq;
+ boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(m_sessionName, seqNo);
+
+ _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
+ _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
+ if (isInserted || isUpdated) {
+ DiffStatePtr commit = make_shared<DiffState>();
+ commit->update(m_sessionName, seqNo);
+ commit->setRootDigest(m_state.getRootDigest());
+ insertToDiffLog(commit, previousRoot);
+
+ satisfyPendingSyncInterests(commit);
+ }
+ }
+}
+
+ConstBufferPtr
+Logic::getRootDigest() const
+{
+ return m_state.getRootDigest();
+}
+
+void
+Logic::printState(std::ostream& os) const
+{
+ BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
+ {
+ os << *leaf << "\n";
+ }
+}
+
+std::set<Name>
+Logic::getSessionNames() const
+{
+ std::set<Name> sessionNames;
+
+ BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
+ {
+ sessionNames.insert(leaf->getSessionName());
+ }
+
+ return sessionNames;
+}
+
+void
+Logic::onSyncInterest(const Name& prefix, const Interest& interest)
+{
+ _LOG_DEBUG_ID(">> Logic::onSyncInterest");
+ Name name = interest.getName();
+
+ _LOG_DEBUG_ID("InterestName: " << name);
+
+ if (RESET_COMPONENT != name.get(-1)) {
+ // normal sync interest
+ processSyncInterest(interest.shared_from_this());
+ }
+ else
+ // reset interest
+ processResetInterest(interest);
+
+ _LOG_DEBUG_ID("<< Logic::onSyncInterest");
+}
+
+void
+Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
+{
+ //Sync prefix registration failed
+ _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
+}
+
+void
+Logic::onSyncData(const Interest& interest, Data& data)
+{
+ _LOG_DEBUG_ID(">> Logic::onSyncData");
+ // Place holder for validator.
+ onSyncDataValidated(data.shared_from_this());
+ _LOG_DEBUG_ID("<< Logic::onSyncData");
+}
+
+void
+Logic::onResetData(const Interest& interest, Data& data)
+{
+ // This should not happened, drop the received data.
+}
+
+void
+Logic::onSyncTimeout(const Interest& interest)
+{
+ // It is OK. Others will handle the time out situation.
+ _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
+ _LOG_DEBUG_ID("Interest: " << interest.getName());
+ _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
+}
+
+void
+Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
+{
+ // SyncReply cannot be validated.
+}
+
+void
+Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
+{
+ Name name = data->getName();
+ ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
+
+ processSyncData(name, digest, data->getContent().blockFromValue());
+}
+
+void
+Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
+ bool isTimedProcessing/*=false*/)
+{
+ _LOG_DEBUG_ID(">> Logic::processSyncInterest");
+
+ const Name& name = interest->getName();
+ ConstBufferPtr digest =
+ make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
+
+ ConstBufferPtr rootDigest = m_state.getRootDigest();
+
+ // If the digest of the incoming interest is the same as root digest
+ // Put the interest into InterestTable
+ if (*rootDigest == *digest) {
+ _LOG_DEBUG_ID("Oh, we are in the same state");
+ m_interestTable.insert(interest, digest, false);
+
+ if (!m_isInReset)
+ return;
+
+ if (!isTimedProcessing) {
+ _LOG_DEBUG_ID("Non timed processing in reset");
+ // Still in reset, our own seq has not been put into state yet
+ // Do not hurry, some others may be also resetting and may send their reply
+ if (static_cast<bool>(m_delayedInterestProcessingId))
+ m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+ time::milliseconds after(m_rangeUniformRandom());
+ _LOG_DEBUG_ID("After: " << after);
+ m_delayedInterestProcessingId =
+ m_scheduler.scheduleEvent(after,
+ bind(&Logic::processSyncInterest, this, interest, true));
+ }
+ else {
+ _LOG_DEBUG_ID("Timed processing in reset");
+ // Now we can get out of reset state by putting our own stuff into m_state.
+ cancelReset();
+ }
+
+ return;
+ }
+
+ // If the digest of incoming interest is an "empty" digest
+ if (digest == EMPTY_DIGEST) {
+ _LOG_DEBUG_ID("Poor guy, he knows nothing");
+ sendSyncData(name, m_state);
+ return;
+ }
+
+ DiffStateContainer::iterator stateIter = m_log.find(digest);
+ // If the digest of incoming interest can be found from the log
+ if (stateIter != m_log.end()) {
+ _LOG_DEBUG_ID("It is ok, you are so close");
+ sendSyncData(name, *(*stateIter)->diff());
+ return;
+ }
+
+ if (!isTimedProcessing) {
+ _LOG_DEBUG_ID("Let's wait, just wait for a while");
+ // Do not hurry, some incoming SyncReplies may help us to recognize the digest
+ bool doesExist = m_interestTable.insert(interest, digest, true);
+ if (doesExist)
+ // Original comment (not sure): somebody else replied, so restart random-game timer
+ // YY: Get the same SyncInterest again, refresh the timer
+ m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+ m_delayedInterestProcessingId =
+ m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
+ bind(&Logic::processSyncInterest, this, interest, true));
+ }
+ else {
+ // OK, nobody is helping us, just tell the truth.
+ _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
+ m_interestTable.erase(digest);
+ sendSyncData(name, m_state);
+ }
+
+ _LOG_DEBUG_ID("<< Logic::processSyncInterest");
+}
+
+void
+Logic::processResetInterest(const Interest& interest)
+{
+ _LOG_DEBUG_ID(">> Logic::processResetInterest");
+ reset();
+}
+
+void
+Logic::processSyncData(const Name& name,
+ ndn::ConstBufferPtr digest,
+ const Block& syncReplyBlock)
+{
+ _LOG_DEBUG_ID(">> Logic::processSyncData");
+
+ DiffStatePtr commit = make_shared<DiffState>();
+ ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
+
+ try {
+ m_interestTable.erase(digest); // Remove satisfied interest from PIT
+
+ State reply;
+ reply.wireDecode(syncReplyBlock);
+
+ std::vector<MissingDataInfo> v;
+ BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
+ {
+ BOOST_ASSERT(leaf != 0);
+
+ const Name& info = leaf->getSessionName();
+ SeqNo seq = leaf->getSeq();
+
+ bool isInserted = false;
+ bool isUpdated = false;
+ SeqNo oldSeq;
+ boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
+
+ if (isInserted || isUpdated) {
+ commit->update(info, seq);
+
+ oldSeq++;
+ MissingDataInfo mdi = {info, oldSeq, seq};
+ v.push_back(mdi);
+ }
+ }
+
+ if (!v.empty()) {
+ m_onUpdate(v);
+
+ commit->setRootDigest(m_state.getRootDigest());
+ insertToDiffLog(commit, previousRoot);
+ }
+ else {
+ _LOG_DEBUG_ID("What? nothing new");
+ }
+ }
+ catch (State::Error&) {
+ _LOG_DEBUG_ID("Something really fishy happened during state decoding");
+ // Something really fishy happened during state decoding;
+ commit.reset();
+ return;
+ }
+
+ if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
+ // state changed and it is safe to express a new interest
+ time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
+ _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
+ EventId eventId = m_scheduler.scheduleEvent(after,
+ bind(&Logic::sendSyncInterest, this));
+
+ m_scheduler.cancelEvent(m_reexpressingInterestId);
+ m_reexpressingInterestId = eventId;
+ }
+}
+
+void
+Logic::satisfyPendingSyncInterests(ConstDiffStatePtr commit)
+{
+ _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
+ try {
+ _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
+ for (InterestTable::const_iterator it = m_interestTable.begin();
+ it != m_interestTable.end(); it++) {
+ ConstUnsatisfiedInterestPtr request = *it;
+
+ if (request->isUnknown)
+ sendSyncData(request->interest->getName(), m_state);
+ else
+ sendSyncData(request->interest->getName(), *commit);
+ }
+ m_interestTable.clear();
+ }
+ catch (InterestTable::Error&) {
+ // ok. not really an error
+ }
+ _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
+}
+
+void
+Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
+{
+ _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
+ // Connect to the history
+ if (!m_log.empty())
+ (*m_log.find(previousRoot))->setNext(commit);
+
+ // Insert the commit
+ m_log.erase(commit->getRootDigest());
+ m_log.insert(commit);
+ _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
+}
+
+void
+Logic::sendResetInterest()
+{
+ _LOG_DEBUG_ID(">> Logic::sendResetInterest");
+
+ if (m_needPeriodReset) {
+ _LOG_DEBUG_ID("Need Period Reset");
+ _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
+
+ EventId eventId =
+ m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
+ bind(&Logic::sendResetInterest, this));
+ m_scheduler.cancelEvent(m_resetInterestId);
+ m_resetInterestId = eventId;
+ }
+
+ Interest interest(m_syncReset);
+ interest.setMustBeFresh(true);
+ interest.setInterestLifetime(m_resetInterestLifetime);
+ m_face.expressInterest(interest,
+ bind(&Logic::onResetData, this, _1, _2),
+ bind(&Logic::onSyncTimeout, this, _1));
+
+ _LOG_DEBUG_ID("<< Logic::sendResetInterest");
+}
+
+void
+Logic::sendSyncInterest()
+{
+ _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
+
+ Name interestName;
+ interestName.append(m_syncPrefix)
+ .append(ndn::name::Component(*m_state.getRootDigest()));
+
+ m_outstandingInterestName = interestName;
+
+#ifdef _DEBUG
+ printDigest(m_state.getRootDigest());
+#endif
+
+ EventId eventId =
+ m_scheduler.scheduleEvent(m_syncInterestLifetime +
+ ndn::time::milliseconds(m_reexpressionJitter()),
+ bind(&Logic::sendSyncInterest, this));
+ m_scheduler.cancelEvent(m_reexpressingInterestId);
+ m_reexpressingInterestId = eventId;
+
+ Interest interest(interestName);
+ interest.setMustBeFresh(true);
+ interest.setInterestLifetime(m_syncInterestLifetime);
+
+ m_outstandingInterestId = m_face.expressInterest(interest,
+ bind(&Logic::onSyncData, this, _1, _2),
+ bind(&Logic::onSyncTimeout, this, _1));
+
+ _LOG_DEBUG_ID("Send interest: " << interest.getName());
+ _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
+}
+
+void
+Logic::sendSyncData(const Name& name, const State& state)
+{
+ _LOG_DEBUG_ID(">> Logic::sendSyncData");
+ shared_ptr<Data> syncReply = make_shared<Data>(name);
+ syncReply->setContent(state.wireEncode());
+ syncReply->setFreshnessPeriod(m_syncReplyFreshness);
+ m_keyChain.sign(*syncReply);
+
+ m_face.put(*syncReply);
+
+ // checking if our own interest got satisfied
+ if (m_outstandingInterestName == name) {
+ // remove outstanding interest
+ if (m_outstandingInterestId != 0) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ m_outstandingInterestId = 0;
+ }
+
+ // re-schedule sending Sync interest
+ time::milliseconds after(m_reexpressionJitter());
+ _LOG_DEBUG_ID("Satisfy our own interest");
+ _LOG_DEBUG_ID("Reschedule sync interest after " << after);
+ EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
+ m_scheduler.cancelEvent(m_reexpressingInterestId);
+ m_reexpressingInterestId = eventId;
+ }
+ _LOG_DEBUG_ID("<< Logic::sendSyncData");
+}
+
+void
+Logic::cancelReset()
+{
+ _LOG_DEBUG_ID(">> Logic::cancelReset");
+ if (!m_isInReset)
+ return;
+
+ m_isInReset = false;
+ updateSeqNo(m_seqNo);
+ _LOG_DEBUG_ID("<< Logic::cancelReset");
+}
+
+void
+Logic::printDigest(ndn::ConstBufferPtr digest)
+{
+ using namespace CryptoPP;
+
+ std::string hash;
+ StringSource(digest->buf(), digest->size(), true,
+ new HexEncoder(new StringSink(hash), false));
+ _LOG_DEBUG_ID("Hash: " << hash);
+}
+
+} // namespace chronosync
diff --git a/src/logic.hpp b/src/logic.hpp
new file mode 100644
index 0000000..2be8636
--- /dev/null
+++ b/src/logic.hpp
@@ -0,0 +1,388 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2014 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
+ * @author Chaoyi Bian <bcy@pku.edu.cn>
+ * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
+ * @author Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#ifndef CHRONOSYNC_LOGIC_HPP
+#define CHRONOSYNC_LOGIC_HPP
+
+#include "boost-header.h"
+#include <memory>
+#include <map>
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+#include "interest-table.hpp"
+#include "diff-state-container.hpp"
+
+namespace chronosync {
+
+/**
+ * @brief The missing sequence numbers for a session
+ *
+ * This class is used to notify the clients of Logic
+ * the details of state changes.
+ *
+ * Instances of this class is usually used as elements of some containers
+ * such as std::vector, thus it is copyable.
+ */
+class MissingDataInfo
+{
+public:
+ /// @brief session name
+ Name session;
+ /// @brief the lowest one of missing sequence numbers
+ SeqNo low;
+ /// @brief the highest one of missing sequence numbers
+ SeqNo high;
+};
+
+/**
+ * @brief The callback function to handle state updates
+ *
+ * The parameter is a set of MissingDataInfo, of which each corresponds to
+ * a session that has changed its state.
+ */
+typedef function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
+
+/**
+ * @brief Logic of ChronoSync
+ */
+class Logic : noncopyable
+{
+public:
+ static const time::steady_clock::Duration DEFAULT_RESET_TIMER;
+ static const time::steady_clock::Duration DEFAULT_CANCEL_RESET_TIMER;
+ static const time::milliseconds DEFAULT_RESET_INTEREST_LIFETIME;
+ static const time::milliseconds DEFAULT_SYNC_INTEREST_LIFETIME;
+ static const time::milliseconds DEFAULT_SYNC_REPLY_FRESHNESS;
+
+ /**
+ * @brief Constructor
+ *
+ * @param syncPrefix The prefix of the sync group
+ * @param userPrefix The prefix of the user who owns the session
+ * @param onUpdate The callback function to handle state updates
+ * @param resetTimer The timer to periodically send Reset Interest
+ * @param syncReplyFreshness The FreshnessPeriod of sync reply
+ * @param resetInterestLifetime The lifetime of sync interest
+ * @param resetInterestLifetime The lifetime of Reset Interest
+ * @param cancelResetTimer The timer to exit from Reset state
+ */
+ Logic(ndn::Face& face,
+ const Name& syncPrefix,
+ const Name& userPrefix,
+ const UpdateCallback& onUpdate,
+ const time::steady_clock::Duration& resetTimer = DEFAULT_RESET_TIMER,
+ const time::steady_clock::Duration& cancelResetTimer = DEFAULT_CANCEL_RESET_TIMER,
+ const time::milliseconds& resetInterestLifetime = DEFAULT_RESET_INTEREST_LIFETIME,
+ const time::milliseconds& syncInterestLifetime = DEFAULT_SYNC_INTEREST_LIFETIME,
+ const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS);
+
+ ~Logic();
+
+ /// @brief Reset the sync tree (and restart synchronization again)
+ void
+ reset();
+
+ /**
+ * @brief Set user prefix
+ *
+ * This method will also change the session name and trigger reset.
+ *
+ * @param userPrefix The prefix of user.
+ */
+ void
+ setUserPrefix(const Name& userPrefix);
+
+ /// @brief Get the name of the local session.
+ const Name&
+ getSessionName() const
+ {
+ return m_sessionName;
+ }
+
+ /// @brief Get current seqNo of the local session.
+ const SeqNo&
+ getSeqNo() const
+ {
+ return m_seqNo;
+ }
+
+ /**
+ * @brief Update the seqNo of the local session
+ *
+ * The method updates the existing seqNo with the supplied seqNo.
+ *
+ * @param seq The new seqNo.
+ */
+ void
+ updateSeqNo(const SeqNo& seq);
+
+ /// @brief Get root digest of current sync tree
+ ndn::ConstBufferPtr
+ getRootDigest() const;
+
+ /// @brief Get the name of all sessions
+ std::set<Name>
+ getSessionNames() const;
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ void
+ printState(std::ostream& os) const;
+
+ ndn::Scheduler&
+ getScheduler()
+ {
+ return m_scheduler;
+ }
+
+ State&
+ getState()
+ {
+ return m_state;
+ }
+
+private:
+ /**
+ * @brief Callback to handle Sync Interest
+ *
+ * This method checks whether an incoming interest is a normal one or a reset
+ * and dispatches the incoming interest to corresponding processing methods.
+ *
+ * @param prefix The prefix of the sync group.
+ * @param interest The incoming sync interest.
+ */
+ void
+ onSyncInterest(const Name& prefix, const Interest& interest);
+
+ /**
+ * @brief Callback to handle Sync prefix registration failure
+ *
+ * This method does nothing for now.
+ *
+ * @param prefix The prefix of the sync group.
+ * @param msg The error message.
+ */
+ void
+ onSyncRegisterFailed(const Name& prefix, const std::string& msg);
+
+ /**
+ * @brief Callback to handle Sync Reply
+ *
+ * This method calls validator to validate Sync Reply.
+ * For now, validation is disabled, Logic::onSyncDataValidated is called
+ * directly.
+ *
+ * @param interest The Sync Interest
+ * @param data The reply to the Sync Interest
+ */
+ void
+ onSyncData(const Interest& interest, Data& data);
+
+ /**
+ * @brief Callback to handle reply to Reset Interest.
+ *
+ * This method does nothing, since reply to Reset Interest is not useful for now.
+ *
+ * @param interest The Reset Interest
+ * @param data The reply to the Reset Interest
+ */
+ void
+ onResetData(const Interest& interest, Data& data);
+
+ /**
+ * @brief Callback to handle Sync Interest timeout.
+ *
+ * This method does nothing, since Logic per se handles timeout explicitly.
+ *
+ * @param interest The Sync Interest
+ */
+ void
+ onSyncTimeout(const Interest& interest);
+
+ /**
+ * @brief Callback to invalid Sync Reply.
+ *
+ * This method does nothing but drops the invalid reply.
+ *
+ * @param data The invalid Sync Reply
+ */
+ void
+ onSyncDataValidationFailed(const shared_ptr<const Data>& data);
+
+ /**
+ * @brief Callback to valid Sync Reply.
+ *
+ * This method simply passes the valid reply to processSyncData.
+ *
+ * @param data The valid Sync Reply.
+ */
+ void
+ onSyncDataValidated(const shared_ptr<const Data>& data);
+
+ /**
+ * @brief Process normal Sync Interest
+ *
+ * This method extracts the digest from the incoming Sync Interest,
+ * compares it against current local digest, and process the Sync
+ * Interest according to the comparison result. See docs/design.rst
+ * for more details.
+ *
+ * @param interest The incoming interest
+ * @param isTimedProcessing True if the interest needs an immediate reply,
+ * otherwise hold the interest for a while before
+ * making a reply (to avoid unnecessary recovery)
+ */
+ void
+ processSyncInterest(const shared_ptr<const Interest>& interest,
+ bool isTimedProcessing = false);
+
+ /**
+ * @brief Process reset Sync Interest
+ *
+ * This method simply call Logic::reset()
+ *
+ * @param interest The incoming interest.
+ */
+ void
+ processResetInterest(const Interest& interest);
+
+ /**
+ * @brief Process Sync Reply.
+ *
+ * This method extracts state update information from Sync Reply and applies
+ * it to the Sync Tree and re-express Sync Interest.
+ *
+ * @param name The data name of the Sync Reply.
+ * @param digest The digest in the data name.
+ * @param syncReplyBlock The content of the Sync Reply.
+ */
+ void
+ processSyncData(const Name& name,
+ ndn::ConstBufferPtr digest,
+ const Block& syncReplyBlock);
+
+ /**
+ * @brief Insert state diff into log
+ *
+ * @param diff The diff .
+ * @param previousRoot The root digest before state changes.
+ */
+ void
+ insertToDiffLog(DiffStatePtr diff,
+ ndn::ConstBufferPtr previousRoot);
+
+ /**
+ * @brief Reply to all pending Sync Interests with a particular commit (or diff)
+ *
+ * @param commit The diff.
+ */
+ void
+ satisfyPendingSyncInterests(ConstDiffStatePtr commit);
+
+ /// @brief Helper method to send normal Sync Interest
+ void
+ sendSyncInterest();
+
+ /// @brief Helper method to send reset Sync Interest
+ void
+ sendResetInterest();
+
+ /// @brief Helper method to send Sync Reply
+ void
+ sendSyncData(const Name& name, const State& state);
+
+ /**
+ * @brief Unset reset status
+ *
+ * By invoking this method, one can add its own state into the Sync Tree, thus
+ * jumping out of the reset status
+ */
+ void
+ cancelReset();
+
+ void
+ printDigest(ndn::ConstBufferPtr digest);
+
+private:
+
+ static const ndn::ConstBufferPtr EMPTY_DIGEST;
+ static const ndn::name::Component RESET_COMPONENT;
+
+ // Communication
+ ndn::Face& m_face;
+ Name m_syncPrefix;
+ const ndn::RegisteredPrefixId* m_syncRegisteredPrefixId;
+ Name m_syncReset;
+ Name m_userPrefix;
+
+ // State
+ Name m_sessionName;
+ SeqNo m_seqNo;
+ State m_state;
+ DiffStateContainer m_log;
+ InterestTable m_interestTable;
+ Name m_outstandingInterestName;
+ const ndn::PendingInterestId* m_outstandingInterestId;
+ bool m_isInReset;
+ bool m_needPeriodReset;
+
+ // Callback
+ UpdateCallback m_onUpdate;
+
+ // Event
+ ndn::Scheduler m_scheduler;
+ ndn::EventId m_delayedInterestProcessingId;
+ ndn::EventId m_reexpressingInterestId;
+ ndn::EventId m_resetInterestId;
+
+ // Timer
+ boost::mt19937 m_randomGenerator;
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
+ boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_reexpressionJitter;
+ /// @brief Timer to send next reset 0 for no reset
+ time::steady_clock::Duration m_resetTimer;
+ /// @brief Timer to cancel reset state
+ time::steady_clock::Duration m_cancelResetTimer;
+ /// @brief Lifetime of reset interest
+ time::milliseconds m_resetInterestLifetime;
+ /// @brief Lifetime of sync interest
+ time::milliseconds m_syncInterestLifetime;
+ /// @brief FreshnessPeriod of SyncReply
+ time::milliseconds m_syncReplyFreshness;
+
+ // Others
+ ndn::KeyChain m_keyChain;
+
+#ifdef _DEBUG
+ int m_instanceId;
+ static int m_instanceCounter;
+#endif
+};
+
+
+} // namespace chronosync
+
+#endif // CHRONOSYNC_LOGIC_HPP
diff --git a/src/sync-event.h b/src/sync-event.h
deleted file mode 100644
index bbe07bc..0000000
--- a/src/sync-event.h
+++ /dev/null
@@ -1,36 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
- * @author Chaoyi Bian <bcy@pku.edu.cn>
- * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- */
-
-#ifndef SYNC_EVENT_H
-#define SYNC_EVENT_H
-
-#include <boost/function.hpp>
-
-namespace Sync
-{
-
-typedef boost::function< void ( ) > Event;
-
-} // Sync
-
-#endif // SYNC_EVENT_H
diff --git a/src/sync-intro-certificate.h b/src/sync-intro-certificate.h
deleted file mode 100644
index d6783ce..0000000
--- a/src/sync-intro-certificate.h
+++ /dev/null
@@ -1,168 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- */
-
-#ifndef SYNC_INTRO_CERTIFICATE_H
-#define SYNC_INTRO_CERTIFICATE_H
-
-#include <ndn-cxx/security/identity-certificate.hpp>
-#include <ndn-cxx/security/signature-sha256-with-rsa.hpp>
-
-namespace Sync {
-
-class IntroCertificate : public ndn::Data
-{
- /**
- * Naming convention of IntroCertificate:
- * /<sync_prefix>/CHRONOS-INTRO-CERT/introducee_certname/introducer_certname/version
- * Content: introducee's identity certificate;
- * KeyLocator: introducer's identity certificate;
- */
-public:
- struct Error : public ndn::Data::Error { Error(const std::string &what) : ndn::Data::Error(what) {} };
-
- IntroCertificate()
- {}
-
- /**
- * @brief Construct IntroCertificate from IdentityCertificate
- *
- * @param syncPrefix
- * @param introduceeCert
- * @param introducerName
- */
- IntroCertificate(const ndn::Name& syncPrefix,
- const ndn::IdentityCertificate& introduceeCert,
- const ndn::Name& introducerCertName); //without version number
-
- /**
- * @brief Construct IntroCertificate using a plain data.
- *
- * if data is not actually IntroCertificate, Error will be thrown out.
- *
- * @param data
- * @throws ndn::IntroCertificate::Error.
- */
- IntroCertificate(const ndn::Data& data);
-
- virtual
- ~IntroCertificate() {};
-
- const ndn::IdentityCertificate&
- getIntroduceeCert() const
- {
- return m_introduceeCert;
- }
-
- const ndn::Name&
- getIntroducerCertName() const
- {
- return m_introducerCertName;
- }
-
- const ndn::Name&
- getIntroduceeCertName() const
- {
- return m_introduceeCertName;
- }
-
-private:
- ndn::Name m_syncPrefix;
- ndn::IdentityCertificate m_introduceeCert;
- ndn::Name m_introducerCertName;
- ndn::Name m_introduceeCertName;
-};
-
-inline
-IntroCertificate::IntroCertificate(const ndn::Name& syncPrefix,
- const ndn::IdentityCertificate& introduceeCert,
- const ndn::Name& introducerCertName)
- : m_syncPrefix(syncPrefix)
- , m_introduceeCert(introduceeCert)
- , m_introducerCertName(introducerCertName)
- , m_introduceeCertName(introduceeCert.getName().getPrefix(-1))
-{
- // Naming convention /<sync_prefix>/CHRONOS-INTRO-CERT/introducee_certname/introducer_certname/version
- ndn::Name dataName = m_syncPrefix;
- dataName.append("CHRONOS-INTRO-CERT")
- .append(m_introduceeCertName.wireEncode())
- .append(m_introducerCertName.wireEncode())
- .appendVersion();
-
- setName(dataName);
- setContent(m_introduceeCert.wireEncode());
-}
-
-inline
-IntroCertificate::IntroCertificate(const ndn::Data& data)
- : Data(data)
-{
- // Naming convention /<sync_prefix>/CHRONOS-INTRO-CERT/introducee_certname/introducer_certname/version
- ndn::Name dataName = data.getName();
-
- if(dataName.size() < 4 || dataName.get(-4).toUri() != "CHRONOS-INTRO-CERT")
- throw Error("Not a Sync::IntroCertificate");
-
- try
- {
- m_introduceeCert.wireDecode(data.getContent().blockFromValue());
- m_introducerCertName.wireDecode(dataName.get(-2).blockFromValue());
- m_introduceeCertName.wireDecode(dataName.get(-3).blockFromValue());
- m_syncPrefix = dataName.getPrefix(-4);
- }
- catch(ndn::IdentityCertificate::Error& e)
- {
- throw Error("Cannot decode introducee cert");
- }
- catch(ndn::Name::Error& e)
- {
- throw Error("Cannot decode name");
- }
- catch(ndn::Block::Error& e)
- {
- throw Error("Cannot decode block name");
- }
-
- if(m_introduceeCertName != m_introduceeCert.getName().getPrefix(-1))
- throw Error("Invalid Sync::IntroCertificate (inconsistent introducee name)");
-
- ndn::Name keyLocatorName;
- try
- {
- ndn::SignatureSha256WithRsa sig(data.getSignature());
- keyLocatorName = sig.getKeyLocator().getName();
- }
- catch(ndn::KeyLocator::Error& e)
- {
- throw Error("Invalid Sync::IntroCertificate (inconsistent introducer name#1)");
- }
- catch(ndn::SignatureSha256WithRsa::Error& e)
- {
- throw Error("Invalid Sync::IntroCertificate (inconsistent introducer name#2)");
- }
-
- if(m_introducerCertName != keyLocatorName)
- throw Error("Invalid Sync::IntroCertificate (inconsistent introducer name#3)");
-}
-
-
-} // namespace Sync
-
-#endif //SYNC_INTRO_CERTIFICATE_H
diff --git a/src/sync-logic-event-container.h b/src/sync-logic-event-container.h
deleted file mode 100644
index 015984d..0000000
--- a/src/sync-logic-event-container.h
+++ /dev/null
@@ -1,86 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
- * @author Chaoyi Bian <bcy@pku.edu.cn>
- * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- */
-
-#ifndef SYNC_LOGIC_EVENT_CONTAINER_H
-#define SYNC_LOGIC_EVENT_CONTAINER_H
-
-#include "sync-event.h"
-
-#include <boost/function.hpp>
-#include <boost/date_time/posix_time/posix_time_types.hpp>
-
-#include <boost/multi_index_container.hpp>
-// #include <boost/multi_index/tag.hpp>
-#include <boost/multi_index/ordered_index.hpp>
-// #include <boost/multi_index/composite_key.hpp>
-// #include <boost/multi_index/hashed_index.hpp>
-// #include <boost/multi_index/random_access_index.hpp>
-#include <boost/multi_index/member.hpp>
-// #include <boost/multi_index/mem_fun.hpp>
-
-namespace mi = boost::multi_index;
-
-namespace Sync
-{
-
-struct LogicEvent
-{
- LogicEvent (const boost::system_time &_time, Event _event, uint32_t _label)
- : time (_time)
- , event (_event)
- , lbl (_label)
- { }
-
- boost::system_time time;
- Event event;
- uint32_t lbl;
-};
-
-/// @cond include_hidden
-struct byLabel { } ;
-/// @endcond
-
-/**
- * \ingroup sync
- * @brief ???
- */
-struct EventsContainer : public mi::multi_index_container<
- LogicEvent,
- mi::indexed_by<
-
- mi::ordered_non_unique<
- mi::member<LogicEvent, boost::system_time, &LogicEvent::time>
- >,
-
- mi::ordered_non_unique<
- mi::tag<byLabel>,
- mi::member<LogicEvent, uint32_t, &LogicEvent::lbl>
- >
- >
- >
-{
-};
-
-} // Sync
-
-#endif // SYNC_LOGIC_EVENT_CONTAINER_H
diff --git a/src/sync-logic.cc b/src/sync-logic.cc
deleted file mode 100644
index 0567b60..0000000
--- a/src/sync-logic.cc
+++ /dev/null
@@ -1,714 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
- * @author Chaoyi Bian <bcy@pku.edu.cn>
- * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- * @author Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- */
-
-#include "sync-logic.h"
-#include "sync-diff-leaf.h"
-#include "sync-full-leaf.h"
-#include "sync-logging.h"
-#include "sync-state.h"
-
-#include <boost/foreach.hpp>
-#include <boost/lexical_cast.hpp>
-#include <vector>
-
-using namespace ndn;
-
-INIT_LOGGER ("SyncLogic")
-
-
-#ifdef _DEBUG
-#define _LOG_DEBUG_ID(v) _LOG_DEBUG(m_instanceId << " " << v)
-#else
-#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
-#endif
-
-#define GET_RANDOM(var) var ()
-
-#define TIME_SECONDS_WITH_JITTER(sec) \
- (time::seconds(sec) + time::milliseconds(GET_RANDOM (m_reexpressionJitter)))
-
-#define TIME_MILLISECONDS_WITH_JITTER(ms) \
- (time::milliseconds(ms) + time::milliseconds(GET_RANDOM (m_reexpressionJitter)))
-
-namespace Sync {
-
-using ndn::shared_ptr;
-
-int SyncLogic::m_instanceCounter = 0;
-
-const int SyncLogic::m_syncResponseFreshness = 1000; // MUST BE dividable by 1000!!!
-const int SyncLogic::m_syncInterestReexpress = 4; // seconds
-
-SyncLogic::SyncLogic (const Name& syncPrefix,
- const IdentityCertificate& myCertificate,
- shared_ptr<Validator> validator,
- shared_ptr<Face> face,
- LogicUpdateCallback onUpdate,
- LogicRemoveCallback onRemove)
- : m_state (new FullState)
- , m_syncInterestTable (face->getIoService(), time::seconds(m_syncInterestReexpress))
- , m_syncPrefix (syncPrefix)
- , m_myCertificate(myCertificate)
- , m_onUpdate (onUpdate)
- , m_onRemove (onRemove)
- , m_perBranch (false)
- , m_validator(validator)
- , m_face(face)
- , m_scheduler(face->getIoService())
- , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
- , m_rangeUniformRandom (m_randomGenerator, boost::uniform_int<> (200,1000))
- , m_reexpressionJitter (m_randomGenerator, boost::uniform_int<> (100,500))
- , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
-{
- m_syncRegisteredPrefixId = m_face->setInterestFilter (m_syncPrefix,
- bind(&SyncLogic::onSyncInterest, this, _1, _2),
- bind(&SyncLogic::onSyncRegisterFailed, this, _1, _2));
-
-
- m_reexpressingInterestId = m_scheduler.scheduleEvent (time::seconds (0), // no need to add jitter
- bind (&SyncLogic::sendSyncInterest, this));
-
- m_instanceId = std::string("Instance " + boost::lexical_cast<std::string>(m_instanceCounter++) + " ");
-}
-
-SyncLogic::SyncLogic (const Name& syncPrefix,
- const IdentityCertificate& myCertificate,
- shared_ptr<Validator> validator,
- shared_ptr<Face> face,
- LogicPerBranchCallback onUpdateBranch)
- : m_state (new FullState)
- , m_syncInterestTable (face->getIoService(), time::seconds (m_syncInterestReexpress))
- , m_syncPrefix (syncPrefix)
- , m_myCertificate(myCertificate)
- , m_onUpdateBranch (onUpdateBranch)
- , m_perBranch(true)
- , m_validator(validator)
- , m_face(face)
- , m_scheduler(face->getIoService())
- , m_randomGenerator (static_cast<unsigned int> (std::time (0)))
- , m_rangeUniformRandom (m_randomGenerator, boost::uniform_int<> (200,1000))
- , m_reexpressionJitter (m_randomGenerator, boost::uniform_int<> (100,500))
- , m_recoveryRetransmissionInterval (m_defaultRecoveryRetransmitInterval)
-{
- m_syncRegisteredPrefixId = m_face->setInterestFilter (m_syncPrefix,
- bind(&SyncLogic::onSyncInterest, this, _1, _2),
- bind(&SyncLogic::onSyncRegisterFailed, this, _1, _2));
-
- m_reexpressingInterestId = m_scheduler.scheduleEvent (time::seconds (0), // no need to add jitter
- bind (&SyncLogic::sendSyncInterest, this));
-}
-
-SyncLogic::~SyncLogic ()
-{
- m_face->unsetInterestFilter(m_syncRegisteredPrefixId);
- m_scheduler.cancelEvent (m_reexpressingInterestId);
- m_scheduler.cancelEvent (m_delayedInterestProcessingId);
-}
-
-/**
- * Two types of intersts
- *
- * Normal name: .../<hash>
- * Recovery name: .../recovery/<hash>
- */
-boost::tuple<DigestConstPtr, std::string>
-SyncLogic::convertNameToDigestAndType (const Name &name)
-{
- BOOST_ASSERT (m_syncPrefix.isPrefixOf(name));
-
- int nameLengthDiff = name.size() - m_syncPrefix.size();
- BOOST_ASSERT (nameLengthDiff > 0);
- BOOST_ASSERT (nameLengthDiff < 3);
-
- std::string hash = name.get(-1).toUri();
- std::string interestType;
-
- if(nameLengthDiff == 1)
- interestType = "normal";
- else
- interestType = name.get(-2).toUri();
-
- _LOG_DEBUG_ID (hash << ", " << interestType);
-
- DigestPtr digest = boost::make_shared<Digest> ();
- std::istringstream is (hash);
- is >> *digest;
-
- return make_tuple (digest, interestType);
-}
-
-void
-SyncLogic::onSyncInterest (const Name& prefix, const ndn::Interest& interest)
-{
- Name name = interest.getName();
-
- _LOG_DEBUG_ID("respondSyncInterest: " << name);
-
- try
- {
- _LOG_DEBUG_ID ("<< I " << name);
-
- if(name.get(m_syncPrefix.size()).toUri() == "CHRONOS-INTRO-CERT")
- // it is a certificate, validator will take care of it.
- return;
-
- DigestConstPtr digest;
- std::string type;
- tie (digest, type) = convertNameToDigestAndType (name);
-
- if (type == "normal") // kind of ineffective...
- {
- processSyncInterest (name, digest);
- }
- else if (type == "recovery")
- {
- processSyncRecoveryInterest (name, digest);
- }
- }
- catch (Error::DigestCalculationError &e)
- {
- _LOG_DEBUG_ID ("Something fishy happened...");
- // log error. ignoring it for now, later we should log it
- return ;
- }
-}
-
-void
-SyncLogic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
-{
- _LOG_DEBUG_ID("Sync prefix registration failed! " << msg);
-}
-
-void
-SyncLogic::onSyncData(const ndn::Interest& interest, Data& data)
-{
- OnDataValidated onValidated = bind(&SyncLogic::onSyncDataValidated, this, _1);
- OnDataValidationFailed onValidationFailed = bind(&SyncLogic::onSyncDataValidationFailed, this, _1);
- m_validator->validate(data, onValidated, onValidationFailed);
-}
-
-void
-SyncLogic::onSyncTimeout(const ndn::Interest& interest)
-{
- // It is OK. Others will handle the time out situation.
-}
-
-void
-SyncLogic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
-{
- _LOG_DEBUG_ID("Sync data cannot be verified!");
-}
-
-void
-SyncLogic::onSyncDataValidated(const shared_ptr<const Data>& data)
-{
- Name name = data->getName();
- const char* wireData = (const char*)data->getContent().value();
- size_t len = data->getContent().value_size();
-
- try
- {
- _LOG_DEBUG_ID ("<< D " << name);
-
- DigestConstPtr digest;
- std::string type;
- tie (digest, type) = convertNameToDigestAndType (name);
-
- if (type == "normal")
- {
- processSyncData (name, digest, wireData, len);
- }
- else
- {
- // timer is always restarted when we schedule recovery
- m_scheduler.cancelEvent (m_reexpressingRecoveryInterestId);
- processSyncData (name, digest, wireData, len);
- }
- }
- catch (Error::DigestCalculationError &e)
- {
- _LOG_DEBUG_ID ("Something fishy happened...");
- // log error. ignoring it for now, later we should log it
- return;
- }
-}
-
-void
-SyncLogic::processSyncInterest (const Name &name, DigestConstPtr digest, bool timedProcessing/*=false*/)
-{
- _LOG_DEBUG_ID("processSyncInterest");
- DigestConstPtr rootDigest;
- {
- rootDigest = m_state->getDigest();
- }
-
- // Special case when state is not empty and we have received request with zero-root digest
- if (digest->isZero () && !rootDigest->isZero ())
- {
-
- SyncStateMsg ssm;
- {
- ssm << (*m_state);
- }
- sendSyncData (name, digest, ssm);
- return;
- }
-
- if (*rootDigest == *digest)
- {
- _LOG_DEBUG_ID ("processSyncInterest (): Same state. Adding to PIT");
- m_syncInterestTable.insert (digest, name.toUri(), false);
- return;
- }
-
- DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
-
- if (stateInDiffLog != m_log.end ())
- {
- DiffStateConstPtr stateDiff = (*stateInDiffLog)->diff ();
-
- sendSyncData (name, digest, stateDiff);
- return;
- }
-
- if (!timedProcessing)
- {
- bool exists = m_syncInterestTable.insert (digest, name.toUri(), true);
- if (exists) // somebody else replied, so restart random-game timer
- {
- _LOG_DEBUG_ID ("Unknown digest, but somebody may have already replied, so restart our timer");
- m_scheduler.cancelEvent (m_delayedInterestProcessingId);
- }
-
- uint32_t waitDelay = GET_RANDOM (m_rangeUniformRandom);
- _LOG_DEBUG_ID ("Digest is not in the log. Schedule processing after small delay: " << time::milliseconds (waitDelay));
-
- m_delayedInterestProcessingId = m_scheduler.scheduleEvent (time::milliseconds (waitDelay),
- bind (&SyncLogic::processSyncInterest, this, name, digest, true));
- }
- else
- {
- _LOG_DEBUG_ID (" (timed processing)");
-
- m_recoveryRetransmissionInterval = m_defaultRecoveryRetransmitInterval;
- sendSyncRecoveryInterests (digest);
- }
-}
-
-void
-SyncLogic::processSyncData (const Name &name, DigestConstPtr digest, const char *wireData, size_t len)
-{
- DiffStatePtr diffLog = boost::make_shared<DiffState> ();
- bool ownInterestSatisfied = false;
-
- try
- {
-
- m_syncInterestTable.remove (name.toUri()); // Remove satisfied interest from PIT
-
- ownInterestSatisfied = (name == m_outstandingInterestName);
-
- DiffState diff;
- SyncStateMsg msg;
- if (!msg.ParseFromArray(wireData, len) || !msg.IsInitialized())
- {
- //Throw
- BOOST_THROW_EXCEPTION (Error::SyncStateMsgDecodingFailure () );
- }
- msg >> diff;
-
- std::vector<MissingDataInfo> v;
- BOOST_FOREACH (LeafConstPtr leaf, diff.getLeaves().get<ordered>())
- {
- DiffLeafConstPtr diffLeaf = boost::dynamic_pointer_cast<const DiffLeaf> (leaf);
- BOOST_ASSERT (diffLeaf != 0);
-
- NameInfoConstPtr info = diffLeaf->getInfo();
- if (diffLeaf->getOperation() == UPDATE)
- {
- SeqNo seq = diffLeaf->getSeq();
-
- bool inserted = false;
- bool updated = false;
- SeqNo oldSeq;
- {
- boost::tie (inserted, updated, oldSeq) = m_state->update (info, seq);
- }
-
- if (inserted || updated)
- {
- diffLog->update (info, seq);
- if (!oldSeq.isValid())
- {
- oldSeq = SeqNo(seq.getSession(), 0);
- }
- else
- {
- ++oldSeq;
- }
- // there is no need for application to process update on forwarder node
- if (info->toString() != forwarderPrefix)
- {
- MissingDataInfo mdi = {info->toString(), oldSeq, seq};
- {
- std::ostringstream interestName;
- interestName << mdi.prefix << "/" << mdi.high.getSession() << "/" << mdi.high.getSeq();
- _LOG_DEBUG_ID("+++++++++++++++ " + interestName.str());
- }
- if (m_perBranch)
- {
- std::ostringstream interestName;
- interestName << mdi.prefix << "/" << mdi.high.getSession() << "/" << mdi.high.getSeq();
- m_onUpdateBranch(interestName.str());
- }
- else
- {
- v.push_back(mdi);
- }
- }
- }
- }
- else if (diffLeaf->getOperation() == REMOVE)
- {
- if (m_state->remove (info))
- {
- diffLog->remove (info);
- if (!m_perBranch)
- {
- m_onRemove (info->toString ());
- }
- }
- }
- else
- {
- }
- }
-
- if (!v.empty())
- {
- if (!m_perBranch)
- {
- m_onUpdate(v);
- }
- }
-
- insertToDiffLog (diffLog);
- }
- catch (Error::SyncStateMsgDecodingFailure &e)
- {
- _LOG_DEBUG_ID ("Something really fishy happened during state decoding " <<
- diagnostic_information (e));
- diffLog.reset ();
- // don't do anything
- }
-
- if ((diffLog != 0 && diffLog->getLeaves ().size () > 0) ||
- ownInterestSatisfied)
- {
- _LOG_DEBUG_ID(" +++++++++++++++ state changed!!!");
- // Do it only if everything went fine and state changed
-
- // this is kind of wrong
- // satisfyPendingSyncInterests (diffLog); // if there are interests in PIT, there is a point to satisfy them using new state
-
- // if state has changed, then it is safe to express a new interest
- time::system_clock::Duration after = time::milliseconds(GET_RANDOM (m_reexpressionJitter));
- // cout << "------------ reexpress interest after: " << after << endl;
- EventId eventId = m_scheduler.scheduleEvent (after,
- bind (&SyncLogic::sendSyncInterest, this));
-
- m_scheduler.cancelEvent (m_reexpressingInterestId);
- m_reexpressingInterestId = eventId;
- }
-}
-
-void
-SyncLogic::processSyncRecoveryInterest (const Name &name, DigestConstPtr digest)
-{
- _LOG_DEBUG_ID("processSyncRecoveryInterest");
- DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
-
- if (stateInDiffLog == m_log.end ())
- {
- _LOG_DEBUG_ID ("Could not find " << *digest << " in digest log");
- return;
- }
-
- SyncStateMsg ssm;
- {
- ssm << (*m_state);
- }
- sendSyncData (name, digest, ssm);
-}
-
-void
-SyncLogic::satisfyPendingSyncInterests (DiffStateConstPtr diffLog)
-{
- DiffStatePtr fullStateLog = boost::make_shared<DiffState> ();
- {
- BOOST_FOREACH (LeafConstPtr leaf, m_state->getLeaves ()/*.get<timed> ()*/)
- {
- fullStateLog->update (leaf->getInfo (), leaf->getSeq ());
- /// @todo Impose limit on how many state info should be send out
- }
- }
-
- try
- {
- uint32_t counter = 0;
- while (m_syncInterestTable.size () > 0)
- {
- Sync::Interest interest = m_syncInterestTable.pop ();
-
- if (!interest.m_unknown)
- {
- _LOG_DEBUG_ID (">> D " << interest.m_name);
- sendSyncData (interest.m_name, interest.m_digest, diffLog);
- }
- else
- {
- _LOG_DEBUG_ID (">> D (unknown)" << interest.m_name);
- sendSyncData (interest.m_name, interest.m_digest, fullStateLog);
- }
- counter ++;
- }
- _LOG_DEBUG_ID ("Satisfied " << counter << " pending interests");
- }
- catch (Error::InterestTableIsEmpty &e)
- {
- // ok. not really an error
- }
-}
-
-void
-SyncLogic::insertToDiffLog (DiffStatePtr diffLog)
-{
- diffLog->setDigest (m_state->getDigest());
- if (m_log.size () > 0)
- {
- m_log.get<sequenced> ().front ()->setNext (diffLog);
- }
- m_log.erase (m_state->getDigest()); // remove diff state with the same digest. next pointers are still valid
- /// @todo Optimization
- m_log.get<sequenced> ().push_front (diffLog);
-}
-
-void
-SyncLogic::addLocalNames (const Name &prefix, uint64_t session, uint64_t seq)
-{
- DiffStatePtr diff;
- {
- //cout << "Add local names" <<endl;
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix.toUri());
-
- _LOG_DEBUG_ID ("addLocalNames (): old state " << *m_state->getDigest ());
-
- SeqNo seqN (session, seq);
- m_state->update(info, seqN);
-
- _LOG_DEBUG_ID ("addLocalNames (): new state " << *m_state->getDigest ());
-
- diff = boost::make_shared<DiffState>();
- diff->update(info, seqN);
- insertToDiffLog (diff);
- }
-
- // _LOG_DEBUG_ID ("PIT size: " << m_syncInterestTable.size ());
- satisfyPendingSyncInterests (diff);
-}
-
-void
-SyncLogic::remove(const Name &prefix)
-{
- DiffStatePtr diff;
- {
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix.toUri());
- m_state->remove(info);
-
- // increment the sequence number for the forwarder node
- NameInfoConstPtr forwarderInfo = StdNameInfo::FindOrCreate(forwarderPrefix);
-
- LeafContainer::iterator item = m_state->getLeaves ().find (forwarderInfo);
- SeqNo seqNo (0);
- if (item != m_state->getLeaves ().end ())
- {
- seqNo = (*item)->getSeq ();
- ++seqNo;
- }
- m_state->update (forwarderInfo, seqNo);
-
- diff = boost::make_shared<DiffState>();
- diff->remove(info);
- diff->update(forwarderInfo, seqNo);
-
- insertToDiffLog (diff);
- }
-
- satisfyPendingSyncInterests (diff);
-}
-
-void
-SyncLogic::sendSyncInterest ()
-{
- _LOG_DEBUG_ID("sendSyncInterest");
-
- {
- m_outstandingInterestName = m_syncPrefix;
- std::ostringstream os;
- os << *m_state->getDigest();
- m_outstandingInterestName.append(os.str());
- _LOG_DEBUG_ID (">> I " << m_outstandingInterestName);
- }
-
- _LOG_DEBUG_ID("sendSyncInterest: " << m_outstandingInterestName);
-
- EventId eventId = m_scheduler.scheduleEvent (time::seconds(m_syncInterestReexpress) + time::milliseconds(GET_RANDOM (m_reexpressionJitter)),
- bind (&SyncLogic::sendSyncInterest, this));
- m_scheduler.cancelEvent (m_reexpressingInterestId);
- m_reexpressingInterestId = eventId;
-
- ndn::Interest interest(m_outstandingInterestName);
- interest.setMustBeFresh(true);
-
- m_face->expressInterest(interest,
- bind(&SyncLogic::onSyncData, this, _1, _2),
- bind(&SyncLogic::onSyncTimeout, this, _1));
-}
-
-void
-SyncLogic::sendSyncRecoveryInterests (DigestConstPtr digest)
-{
- std::ostringstream os;
- os << *digest;
-
- Name interestName = m_syncPrefix;
- interestName.append("recovery").append(os.str());
-
- time::system_clock::Duration nextRetransmission = time::milliseconds (m_recoveryRetransmissionInterval + GET_RANDOM (m_reexpressionJitter));
-
- m_recoveryRetransmissionInterval <<= 1;
-
- m_scheduler.cancelEvent (m_reexpressingRecoveryInterestId);
- if (m_recoveryRetransmissionInterval < 100*1000) // <100 seconds
- m_reexpressingRecoveryInterestId = m_scheduler.scheduleEvent (nextRetransmission,
- bind (&SyncLogic::sendSyncRecoveryInterests, this, digest));
-
- ndn::Interest interest(interestName);
- interest.setMustBeFresh(true);
-
- m_face->expressInterest(interest,
- bind(&SyncLogic::onSyncData, this, _1, _2),
- bind(&SyncLogic::onSyncTimeout, this, _1));
-}
-
-
-void
-SyncLogic::sendSyncData (const Name &name, DigestConstPtr digest, StateConstPtr state)
-{
- SyncStateMsg msg;
- msg << (*state);
- sendSyncData(name, digest, msg);
-}
-
-// pass in state msg instead of state, so that there is no need to lock the state until
-// this function returns
-void
-SyncLogic::sendSyncData (const Name &name, DigestConstPtr digest, SyncStateMsg &ssm)
-{
- _LOG_DEBUG_ID (">> D " << name);
- int size = ssm.ByteSize();
- char *wireData = new char[size];
- ssm.SerializeToArray(wireData, size);
-
- Data syncData(name);
- syncData.setContent(reinterpret_cast<const uint8_t*>(wireData), size);
- syncData.setFreshnessPeriod(time::milliseconds(m_syncResponseFreshness));
-
- m_keyChain.sign(syncData, m_myCertificate.getName());
-
- m_face->put(syncData);
-
- delete []wireData;
-
- // checking if our own interest got satisfied
- bool satisfiedOwnInterest = false;
- {
- satisfiedOwnInterest = (m_outstandingInterestName == name);
- }
-
- if (satisfiedOwnInterest)
- {
- _LOG_DEBUG_ID ("Satisfied our own Interest. Re-expressing (hopefully with a new digest)");
-
- time::system_clock::Duration after = time::milliseconds(GET_RANDOM (m_reexpressionJitter));
- // cout << "------------ reexpress interest after: " << after << endl;
- EventId eventId = m_scheduler.scheduleEvent (after,
- bind (&SyncLogic::sendSyncInterest, this));
- m_scheduler.cancelEvent (m_reexpressingInterestId);
- m_reexpressingInterestId = eventId;
- }
-}
-
-std::string
-SyncLogic::getRootDigest()
-{
- std::ostringstream os;
- os << *m_state->getDigest();
- return os.str();
-}
-
-size_t
-SyncLogic::getNumberOfBranches () const
-{
- return m_state->getLeaves ().size ();
-}
-
-void
-SyncLogic::printState () const
-{
- BOOST_FOREACH (const boost::shared_ptr<Sync::Leaf> leaf, m_state->getLeaves ())
- {
- std::cout << *leaf << std::endl;
- }
-}
-
-std::map<std::string, bool>
-SyncLogic::getBranchPrefixes() const
-{
- std::map<std::string, bool> m;
-
- BOOST_FOREACH (const boost::shared_ptr<Sync::Leaf> leaf, m_state->getLeaves ())
- {
- std::string prefix = leaf->getInfo()->toString();
- // do not return forwarder prefix
- if (prefix != forwarderPrefix)
- {
- m.insert(std::pair<std::string, bool>(prefix, false));
- }
- }
-
- return m;
-}
-
-}
diff --git a/src/sync-logic.h b/src/sync-logic.h
deleted file mode 100644
index 181979b..0000000
--- a/src/sync-logic.h
+++ /dev/null
@@ -1,214 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
- * @author Chaoyi Bian <bcy@pku.edu.cn>
- * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- * @author Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- */
-
-#ifndef SYNC_LOGIC_H
-#define SYNC_LOGIC_H
-
-#include "boost-header.h"
-#include <memory>
-#include <map>
-
-#include <ndn-cxx/face.hpp>
-#include <ndn-cxx/security/validator.hpp>
-#include <ndn-cxx/security/key-chain.hpp>
-#include <ndn-cxx/util/scheduler.hpp>
-
-#include "sync-interest-table.h"
-#include "sync-diff-state.h"
-#include "sync-full-state.h"
-#include "sync-std-name-info.h"
-
-#include "sync-diff-state-container.h"
-
-#ifdef _DEBUG
-#ifdef HAVE_LOG4CXX
-#include <log4cxx/logger.h>
-#endif
-#endif
-
-namespace Sync {
-
-struct MissingDataInfo {
- std::string prefix;
- SeqNo low;
- SeqNo high;
-};
-
-/**
- * \ingroup sync
- * @brief A wrapper for SyncApp, which handles ccnx related things (process
- * interests and data)
- */
-
-class SyncLogic
-{
-public:
- //typedef boost::function< void ( const std::string &/*prefix*/, const SeqNo &/*newSeq*/, const SeqNo &/*oldSeq*/ ) > LogicUpdateCallback;
- typedef boost::function< void (const std::vector<MissingDataInfo> & ) > LogicUpdateCallback;
- typedef boost::function< void (const std::string &/*prefix*/ ) > LogicRemoveCallback;
- typedef boost::function< void (const std::string &)> LogicPerBranchCallback;
-
- SyncLogic (const ndn::Name& syncPrefix,
- const ndn::IdentityCertificate& myCertificate,
- ndn::shared_ptr<ndn::Validator> validator,
- ndn::shared_ptr<ndn::Face> face,
- LogicUpdateCallback onUpdate,
- LogicRemoveCallback onRemove);
-
- SyncLogic (const ndn::Name& syncPrefix,
- const ndn::IdentityCertificate& myCertificate,
- ndn::shared_ptr<ndn::Validator> validator,
- ndn::shared_ptr<ndn::Face> face,
- LogicPerBranchCallback onUpdateBranch);
-
- ~SyncLogic ();
-
- /**
- * a wrapper for the same func in SyncApp
- */
- void addLocalNames (const ndn::Name &prefix, uint64_t session, uint64_t seq);
-
- /**
- * @brief remove a participant's subtree from the sync tree
- * @param prefix the name prefix for the participant
- */
- void remove (const ndn::Name &prefix);
-
- std::string
- getRootDigest();
-
-#ifdef _DEBUG
- ndn::Scheduler &
- getScheduler () { return m_scheduler; }
-#endif
-
- void
- printState () const;
-
- std::map<std::string, bool>
- getBranchPrefixes() const;
-
-private:
- void
- delayedChecksLoop ();
-
- void
- onSyncInterest (const ndn::Name& prefix, const ndn::Interest& interest);
-
- void
- onSyncRegisterFailed(const ndn::Name& prefix, const std::string& msg);
-
- void
- onSyncData(const ndn::Interest& interest, ndn::Data& data);
-
- void
- onSyncTimeout(const ndn::Interest& interest);
-
- void
- onSyncDataValidationFailed(const ndn::shared_ptr<const ndn::Data>& data);
-
- void
- onSyncDataValidated(const ndn::shared_ptr<const ndn::Data>& data);
-
- void
- processSyncInterest (const ndn::Name &name,
- DigestConstPtr digest, bool timedProcessing=false);
-
- void
- processSyncData (const ndn::Name &name,
- DigestConstPtr digest, const char *wireData, size_t len);
-
- void
- processSyncRecoveryInterest (const ndn::Name &name,
- DigestConstPtr digest);
-
- void
- insertToDiffLog (DiffStatePtr diff);
-
- void
- satisfyPendingSyncInterests (DiffStateConstPtr diff);
-
- boost::tuple<DigestConstPtr, std::string>
- convertNameToDigestAndType (const ndn::Name &name);
-
- void
- sendSyncInterest ();
-
- void
- sendSyncRecoveryInterests (DigestConstPtr digest);
-
- void
- sendSyncData (const ndn::Name &name,
- DigestConstPtr digest, StateConstPtr state);
-
- void
- sendSyncData (const ndn::Name &name,
- DigestConstPtr digest, SyncStateMsg &msg);
-
- size_t
- getNumberOfBranches () const;
-
-private:
- FullStatePtr m_state;
- DiffStateContainer m_log;
-
- ndn::Name m_outstandingInterestName;
- SyncInterestTable m_syncInterestTable;
-
- ndn::Name m_syncPrefix;
- ndn::IdentityCertificate m_myCertificate;
- LogicUpdateCallback m_onUpdate;
- LogicRemoveCallback m_onRemove;
- LogicPerBranchCallback m_onUpdateBranch;
- bool m_perBranch;
- ndn::shared_ptr<ndn::Validator> m_validator;
- ndn::KeyChain m_keyChain;
- ndn::shared_ptr<ndn::Face> m_face;
- const ndn::RegisteredPrefixId* m_syncRegisteredPrefixId;
-
- ndn::Scheduler m_scheduler;
-
- boost::mt19937 m_randomGenerator;
- boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
- boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_reexpressionJitter;
-
- static const int m_unknownDigestStoreTime = 10; // seconds
- static const int m_syncResponseFreshness; // MUST BE dividable by 1000!!!
- static const int m_syncInterestReexpress; // seconds
-
- static const int m_defaultRecoveryRetransmitInterval = 200; // milliseconds
- uint32_t m_recoveryRetransmissionInterval; // milliseconds
-
- ndn::EventId m_delayedInterestProcessingId;
- ndn::EventId m_reexpressingInterestId;
- ndn::EventId m_reexpressingRecoveryInterestId;
-
- std::string m_instanceId;
- static int m_instanceCounter;
-};
-
-
-} // Sync
-
-#endif // SYNC_APP_WRAPPER_H
diff --git a/src/sync-validator.cc b/src/sync-validator.cc
deleted file mode 100644
index e5cc0f5..0000000
--- a/src/sync-validator.cc
+++ /dev/null
@@ -1,240 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- */
-
-#include "sync-validator.h"
-#include "sync-logging.h"
-#include <ndn-cxx/security/certificate-cache-ttl.hpp>
-#include <queue>
-
-using namespace ndn;
-
-INIT_LOGGER ("SyncValidator")
-
-namespace Sync {
-
-using ndn::shared_ptr;
-
-const shared_ptr<CertificateCache> SyncValidator::DefaultCertificateCache = shared_ptr<CertificateCache>();
-const shared_ptr<SecRuleRelative> SyncValidator::DefaultDataRule = shared_ptr<SecRuleRelative>();
-
-SyncValidator::SyncValidator(const Name& prefix,
- const IdentityCertificate& anchor,
- Face& face,
- const PublishCertCallback& publishCertCallback,
- shared_ptr<SecRuleRelative> rule,
- shared_ptr<CertificateCache> certificateCache,
- const int stepLimit)
- : Validator(face)
- , m_prefix(prefix)
- , m_anchor(anchor)
- , m_stepLimit(stepLimit)
- , m_certificateCache(certificateCache)
- , m_publishCertCallback(publishCertCallback)
- , m_dataRule(rule)
-{
- if(!static_cast<bool>(m_certificateCache))
- m_certificateCache = make_shared<CertificateCacheTtl>(boost::ref(m_face.getIoService()));
-
- Name certPrefix = prefix;
- certPrefix.append("CHRONOS-INTRO-CERT");
- m_prefixId = m_face.setInterestFilter(certPrefix,
- bind(&SyncValidator::onCertInterest, this, _1, _2),
- bind(&SyncValidator::onCertRegisterFailed, this, _1, _2));
-
- setAnchor(m_anchor);
-}
-
-void
-SyncValidator::deriveTrustNodes()
-{
- std::queue<Name> nodeQueue;
-
- // Clear existing trust nodes.
- m_trustedNodes.clear();
-
- // Add the trust anchor.
- IntroNode origin(m_anchor);
- m_trustedNodes[origin.name()] = m_anchor.getPublicKeyInfo();
- nodeQueue.push(origin.name());
-
- // BFS trusted nodes.
- while(!nodeQueue.empty())
- {
- // Get next trusted node to process.
- Nodes::const_iterator it = m_introNodes.find(nodeQueue.front());
- const PublicKey& publicKey = m_trustedNodes[nodeQueue.front()];
-
- if(it != m_introNodes.end())
- {
- // If the trusted node exists in the graph.
- IntroNode::const_iterator eeIt = it->second.introduceeBegin();
- IntroNode::const_iterator eeEnd = it->second.introduceeEnd();
- for(; eeIt != eeEnd; eeIt++)
- {
- // Check the nodes introduced by the trusted node.
- Edges::const_iterator edgeIt = m_introCerts.find(*eeIt);
- if(edgeIt != m_introCerts.end()
- && m_trustedNodes.find(edgeIt->second.getIntroduceeCertName()) == m_trustedNodes.end()
- && verifySignature(edgeIt->second, publicKey))
- {
- // If the introduced node can be validated, add it into trusted node set and the node queue.
- m_trustedNodes[edgeIt->second.getIntroduceeCertName()] = edgeIt->second.getIntroduceeCert().getPublicKeyInfo();
- nodeQueue.push(edgeIt->second.getIntroduceeCertName());
- }
- }
- }
- nodeQueue.pop();
- }
-}
-
-void
-SyncValidator::checkPolicy (const Data& data,
- int stepCount,
- const OnDataValidated& onValidated,
- const OnDataValidationFailed& onValidationFailed,
- std::vector<shared_ptr<ValidationRequest> >& nextSteps)
-{
- if(m_stepLimit == stepCount)
- return onValidationFailed(data.shared_from_this(),
- "Maximum steps of validation reached: " + data.getName().toUri());
-
- if(m_prefix.isPrefixOf(data.getName()) || (static_cast<bool>(m_dataRule) && m_dataRule->satisfy(data)))
- {
- try
- {
- SignatureSha256WithRsa sig(data.getSignature());
- Name keyLocatorName = sig.getKeyLocator().getName();
-
- TrustNodes::const_iterator it = m_trustedNodes.find(keyLocatorName);
- if(m_trustedNodes.end() != it)
- {
- if(verifySignature(data, sig, it->second))
- return onValidated(data.shared_from_this());
- else
- return onValidationFailed(data.shared_from_this(),
- "Cannot verify signature: " + data.getName().toUri());
- }
- else
- {
- _LOG_DEBUG("I am: " << m_anchor.getName().get(0).toUri() << " for " << data.getName());
-
- Name interestName = m_prefix;
- interestName.append("CHRONOS-INTRO-CERT").append(keyLocatorName.wireEncode());
- Interest interest(interestName);
- interest.setInterestLifetime(time::milliseconds(500));
-
- OnDataValidated onKeyValidated = bind(&SyncValidator::onCertificateValidated, this,
- _1, data.shared_from_this(), onValidated, onValidationFailed);
-
- OnDataValidationFailed onKeyValidationFailed = bind(&SyncValidator::onCertificateValidationFailed, this,
- _1, _2, data.shared_from_this(), onValidationFailed);
-
- shared_ptr<ValidationRequest> nextStep = make_shared<ValidationRequest>(interest,
- onKeyValidated,
- onKeyValidationFailed,
- 1,
- stepCount + 1);
- nextSteps.push_back(nextStep);
-
- return;
- }
- }
- catch(SignatureSha256WithRsa::Error& e)
- {
- return onValidationFailed(data.shared_from_this(),
- "Not SignatureSha256WithRsa signature: " + std::string(e.what()));
- }
- catch(KeyLocator::Error& e)
- {
- return onValidationFailed(data.shared_from_this(),
- "Key Locator is not a name: " + data.getName().toUri());
- }
- }
- else
- return onValidationFailed(data.shared_from_this(),
- "No rule or rule is not satisfied: " + data.getName().toUri());
-}
-
-void
-SyncValidator::checkPolicy (const Interest& interest,
- int stepCount,
- const OnInterestValidated& onValidated,
- const OnInterestValidationFailed& onValidationFailed,
- std::vector<shared_ptr<ValidationRequest> >& nextSteps)
-{
- onValidationFailed(interest.shared_from_this(), "No policy for signed interest checking");
-}
-
-void
-SyncValidator::onCertificateValidated(const shared_ptr<const Data>& signCertificate,
- const shared_ptr<const Data>& data,
- const OnDataValidated& onValidated,
- const OnDataValidationFailed& onValidationFailed)
-{
- try
- {
- IntroCertificate introCert(*signCertificate);
- addParticipant(introCert);
-
- if(verifySignature(*data, introCert.getIntroduceeCert().getPublicKeyInfo()))
- return onValidated(data);
- else
- return onValidationFailed(data,
- "Cannot verify signature: " + data->getName().toUri());
- }
- catch(IntroCertificate::Error& e)
- {
- return onValidationFailed(data,
- "Intro cert decoding error: " + std::string(e.what()));
- }
-}
-
-void
-SyncValidator::onCertificateValidationFailed(const shared_ptr<const Data>& signCertificate,
- const std::string& failureInfo,
- const shared_ptr<const Data>& data,
- const OnDataValidationFailed& onValidationFailed)
-{
- onValidationFailed(data, failureInfo);
-}
-
-void
-SyncValidator::onCertInterest(const Name& prefix, const Interest& interest)
-{
- Name name = interest.getName();
- Edges::const_iterator it = m_introCerts.begin();
- for(; it != m_introCerts.end(); it++)
- {
- if(name.isPrefixOf(it->first))
- {
- m_face.put(it->second);
- return;
- }
- }
-}
-
-void
-SyncValidator::onCertRegisterFailed(const Name& prefix, const std::string& msg)
-{
- _LOG_DEBUG("SyncValidator::onCertRegisterFailed: " << msg);
-}
-
-} // namespace Sync
diff --git a/src/sync-validator.h b/src/sync-validator.h
deleted file mode 100644
index 9e5f4df..0000000
--- a/src/sync-validator.h
+++ /dev/null
@@ -1,341 +0,0 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
- *
- * This file is part of ChronoSync, synchronization library for distributed realtime
- * applications for NDN.
- *
- * ChronoSync is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation, either
- * version 3 of the License, or (at your option) any later version.
- *
- * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- *
- * @author Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
- */
-
-#ifndef SYNC_VALIDATOR_H
-#define SYNC_VALIDATOR_H
-
-#include "sync-intro-certificate.h"
-#include <ndn-cxx/security/validator.hpp>
-#include <ndn-cxx/security/key-chain.hpp>
-#include <ndn-cxx/security/sec-rule-relative.hpp>
-#include <ndn-cxx/security/certificate-cache.hpp>
-#include <map>
-
-namespace Sync {
-
-class SyncValidator : public ndn::Validator
-{
-public:
- typedef ndn::function< void (const uint8_t*, size_t, int) > PublishCertCallback;
-
- struct Error : public ndn::Validator::Error { Error(const std::string &what) : ndn::Validator::Error(what) {} };
-
- static const ndn::shared_ptr<ndn::CertificateCache> DefaultCertificateCache;
- static const ndn::shared_ptr<ndn::SecRuleRelative> DefaultDataRule;
-
- SyncValidator(const ndn::Name& prefix,
- const ndn::IdentityCertificate& anchor,
- ndn::Face& face,
- const PublishCertCallback& publishCertCallback,
- ndn::shared_ptr<ndn::SecRuleRelative> rule = DefaultDataRule,
- ndn::shared_ptr<ndn::CertificateCache> certificateCache = DefaultCertificateCache,
- const int stepLimit = 10);
-
- virtual
- ~SyncValidator()
- {
- m_face.unsetInterestFilter(m_prefixId);
- }
-
- /**
- * @brief Set the trust anchor
- *
- * The anchor should be the participant's own certificate.
- * This anchor node is the origin of the derived trust graph.
- * Once the new anchor is set, derive the TrustNode set.
- *
- * @param anchor.
- */
- inline void
- setAnchor(const ndn::IdentityCertificate& anchor);
-
- /**
- * @brief Add a node into the trust graph.
- *
- * The method also create an edge from trust anchor to the node.
- *
- * @param introducee.
- * @return IntroCertificate for the introducee.
- */
- inline ndn::shared_ptr<const IntroCertificate>
- addParticipant(const ndn::IdentityCertificate& introducee);
-
- /**
- * @brief Add an edge into the trust graph.
- *
- * Create nodes if it is one of the edge's ends and does not exist in the graph.
- *
- * @param introCert.
- */
- inline void
- addParticipant(const IntroCertificate& introCert);
-
- inline void
- getIntroCertNames(std::vector<ndn::Name>& list);
-
- inline const IntroCertificate&
- getIntroCertificate(const ndn::Name& name);
-
-#ifdef _TEST
- bool
- canTrust(const ndn::Name& certName)
- {
- return (m_trustedNodes.find(certName.getPrefix(-1)) != m_trustedNodes.end());
- }
-#endif //_DEBUG
-
-protected:
- /***********************
- * From ndn::Validator *
- ***********************/
- virtual void
- checkPolicy (const ndn::Data& data,
- int stepCount,
- const ndn::OnDataValidated& onValidated,
- const ndn::OnDataValidationFailed& onValidationFailed,
- std::vector<ndn::shared_ptr<ndn::ValidationRequest> >& nextSteps);
-
- virtual void
- checkPolicy (const ndn::Interest& interest,
- int stepCount,
- const ndn::OnInterestValidated& onValidated,
- const ndn::OnInterestValidationFailed& onValidationFailed,
- std::vector<ndn::shared_ptr<ndn::ValidationRequest> >& nextSteps);
-private:
- void
- deriveTrustNodes();
-
-
- void
- onCertificateValidated(const ndn::shared_ptr<const ndn::Data>& signCertificate,
- const ndn::shared_ptr<const ndn::Data>& data,
- const ndn::OnDataValidated& onValidated,
- const ndn::OnDataValidationFailed& onValidationFailed);
-
- void
- onCertificateValidationFailed(const ndn::shared_ptr<const ndn::Data>& signCertificate,
- const std::string& failureInfo,
- const ndn::shared_ptr<const ndn::Data>& data,
- const ndn::OnDataValidationFailed& onValidationFailed);
-
- void
- onCertInterest (const ndn::Name& prefix, const ndn::Interest& interest);
-
- void
- onCertRegisterFailed(const ndn::Name& prefix, const std::string& msg);
-
-private:
- class IntroNode;
-
- // Syncprefix
- ndn::Name m_prefix;
-
- // The map
- typedef std::map<const ndn::Name, IntroNode> Nodes;
- typedef std::map<const ndn::Name, IntroCertificate> Edges;
- Nodes m_introNodes;
- Edges m_introCerts;
-
- // The derived trust info
- typedef std::map<const ndn::Name, ndn::PublicKey> TrustNodes;
- ndn::IdentityCertificate m_anchor;
- TrustNodes m_trustedNodes;
-
- // others
- int m_stepLimit;
- ndn::shared_ptr<ndn::CertificateCache> m_certificateCache;
- ndn::KeyChain m_keychain;
- const ndn::RegisteredPrefixId* m_prefixId;
- PublishCertCallback m_publishCertCallback;
- ndn::shared_ptr<ndn::SecRuleRelative> m_dataRule;
-
- class IntroNode
- {
- public:
- typedef std::vector<ndn::Name>::const_iterator const_iterator;
-
- IntroNode()
- {}
-
- IntroNode(const ndn::IdentityCertificate& idCert)
- : m_nodeName(idCert.getName().getPrefix(-1))
- {}
-
- IntroNode(const IntroCertificate& introCert, bool isIntroducer)
- {
- if(isIntroducer)
- {
- m_nodeName = introCert.getIntroducerCertName();
- m_introduceeCerts.push_back(introCert.getName());
- }
- else
- {
- m_nodeName = introCert.getIntroduceeCertName();
- m_introducerCerts.push_back(introCert.getName());
- }
- }
-
- ~IntroNode()
- {}
-
- const ndn::Name&
- name() const
- {
- return m_nodeName;
- }
-
- const_iterator
- introducerBegin() const
- {
- return m_introducerCerts.begin();
- }
-
- const_iterator
- introducerEnd() const
- {
- return m_introducerCerts.end();
- }
-
- const_iterator
- introduceeBegin() const
- {
- return m_introduceeCerts.begin();
- }
-
- const_iterator
- introduceeEnd() const
- {
- return m_introduceeCerts.end();
- }
-
- void
- addIntroCertAsIntroducer(const ndn::Name& introCertName)
- {
- if(std::find(m_introduceeCerts.begin(), m_introduceeCerts.end(), introCertName) == m_introduceeCerts.end())
- m_introduceeCerts.push_back(introCertName);
- }
-
- void
- addIntroCertAsIntroducee(const ndn::Name& introCertName)
- {
- if(std::find(m_introducerCerts.begin(), m_introducerCerts.end(), introCertName) == m_introducerCerts.end())
- m_introducerCerts.push_back(introCertName);
- }
-
- private:
- ndn::Name m_nodeName;
- std::vector<ndn::Name> m_introducerCerts;
- std::vector<ndn::Name> m_introduceeCerts;
- };
-
-};
-
-inline void
-SyncValidator::setAnchor(const ndn::IdentityCertificate& anchor)
-{
- m_anchor = anchor;
-
- // Add anchor into trust graph if it does not exist.
- IntroNode origin(m_anchor);
- Nodes::const_iterator nodeIt = m_introNodes.find(origin.name());
- if(nodeIt == m_introNodes.end())
- m_introNodes[origin.name()] = origin;
-
- deriveTrustNodes();
-}
-
-inline void
-SyncValidator::addParticipant(const IntroCertificate& introCert)
-{
- // Check if the edge has been added before.
- ndn::Name certName = introCert.getName();
- Edges::const_iterator edgeIt = m_introCerts.find(certName);
- if(edgeIt != m_introCerts.end())
- return; // the edge has been added before.
-
- m_introCerts[certName] = introCert;
-
- // Check if the introducer has been added.
- Nodes::iterator nodeIt = m_introNodes.find(introCert.getIntroducerCertName());
- if(nodeIt == m_introNodes.end())
- {
- IntroNode node(introCert, true);
- m_introNodes[node.name()] = node;
- }
- else
- nodeIt->second.addIntroCertAsIntroducer(certName);
-
- // Check if the introducee has been added.
- nodeIt = m_introNodes.find(introCert.getIntroduceeCertName());
- if(nodeIt == m_introNodes.end())
- {
- IntroNode node(introCert, false);
- m_introNodes[node.name()] = node;
- }
- else
- nodeIt->second.addIntroCertAsIntroducee(certName);
-
- // Check if the introducer is one of the trusted nodes.
- TrustNodes::const_iterator trustNodeIt = m_trustedNodes.find(introCert.getIntroducerCertName());
- if(trustNodeIt != m_trustedNodes.end() && verifySignature(introCert, trustNodeIt->second))
- // If the introducee, add it into trusted node set.
- m_trustedNodes[introCert.getIntroduceeCertName()] = introCert.getIntroduceeCert().getPublicKeyInfo();
-}
-
-inline ndn::shared_ptr<const IntroCertificate>
-SyncValidator::addParticipant(const ndn::IdentityCertificate& introducee)
-{
- ndn::shared_ptr<IntroCertificate> introCert
- = ndn::shared_ptr<IntroCertificate>(new IntroCertificate(m_prefix, introducee, m_anchor.getName().getPrefix(-1)));
-
- m_keychain.sign(*introCert, m_anchor.getName());
-
- addParticipant(*introCert);
-
- // Publish certificate as normal data.
- ndn::Block block = introCert->wireEncode();
- m_publishCertCallback(block.wire(), block.size(), 1000);
-
- return introCert;
-}
-
-inline void
-SyncValidator::getIntroCertNames(std::vector<ndn::Name>& list)
-{
- Edges::const_iterator it = m_introCerts.begin();
- Edges::const_iterator end = m_introCerts.end();
- for(; it != end; it++)
- list.push_back(it->first);
-}
-
-inline const IntroCertificate&
-SyncValidator::getIntroCertificate(const ndn::Name& name)
-{
- Edges::const_iterator it = m_introCerts.find(name);
- if(it != m_introCerts.end())
- return it->second;
- else
- throw Error("No cert");
-}
-
-} // namespace Sync
-
-#endif //SYNC_VALIDATOR_H