Fix logic

Change-Id: I7bfb72e8bb245fab3e9a9d575abc7217bbae86d0
diff --git a/src/logic.cpp b/src/logic.cpp
new file mode 100644
index 0000000..f584da5
--- /dev/null
+++ b/src/logic.cpp
@@ -0,0 +1,593 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2014 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
+ * @author Chaoyi Bian <bcy@pku.edu.cn>
+ * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
+ * @author Yingdi Yu <yingdi@cs.ucla.edu>
+ */
+
+#include "logic.hpp"
+#include "logger.hpp"
+
+INIT_LOGGER("Logic");
+
+#ifdef _DEBUG
+#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
+#else
+#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
+#endif
+
+namespace chronosync {
+
+using ndn::ConstBufferPtr;
+using ndn::EventId;
+
+const uint8_t EMPTY_DIGEST_VALUE[] = {
+  0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
+  0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
+  0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
+  0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
+};
+
+#ifdef _DEBUG
+int Logic::m_instanceCounter = 0;
+#endif
+
+const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
+const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
+const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
+const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
+const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
+
+const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
+const ndn::name::Component Logic::RESET_COMPONENT("reset");
+
+Logic::Logic(ndn::Face& face,
+             const Name& syncPrefix,
+             const Name& userPrefix,
+             const UpdateCallback& onUpdate,
+             const time::steady_clock::Duration& resetTimer,
+             const time::steady_clock::Duration& cancelResetTimer,
+             const time::milliseconds& resetInterestLifetime,
+             const time::milliseconds& syncInterestLifetime,
+             const time::milliseconds& syncReplyFreshness)
+  : m_face(face)
+  , m_syncPrefix(syncPrefix)
+  , m_userPrefix(userPrefix)
+  , m_interestTable(m_face.getIoService())
+  , m_outstandingInterestId(0)
+  , m_isInReset(false)
+  , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
+  , m_onUpdate(onUpdate)
+  , m_scheduler(m_face.getIoService())
+  , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
+  , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
+  , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
+  , m_resetTimer(resetTimer)
+  , m_cancelResetTimer(cancelResetTimer)
+  , m_resetInterestLifetime(resetInterestLifetime)
+  , m_syncInterestLifetime(syncInterestLifetime)
+  , m_syncReplyFreshness(syncReplyFreshness)
+{
+#ifdef _DEBUG
+  m_instanceId = m_instanceCounter++;
+#endif
+
+  _LOG_DEBUG_ID(">> Logic::Logic");
+
+  m_syncReset = m_syncPrefix;
+  m_syncReset.append("reset");
+
+  _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
+  m_syncRegisteredPrefixId =
+    m_face.setInterestFilter(m_syncPrefix,
+                             bind(&Logic::onSyncInterest, this, _1, _2),
+                             bind(&Logic::onSyncRegisterFailed, this, _1, _2));
+
+  setUserPrefix(m_userPrefix);
+
+  _LOG_DEBUG_ID("<< Logic::Logic");
+}
+
+Logic::~Logic()
+{
+  m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
+  m_scheduler.cancelAllEvents();
+}
+
+void
+Logic::reset()
+{
+  m_isInReset = true;
+
+  m_state.reset();
+  m_log.clear();
+
+  sendResetInterest();
+
+  // reset outstanding interest name, so that data for previous interest will be dropped.
+  if (m_outstandingInterestId != 0) {
+    m_face.removePendingInterest(m_outstandingInterestId);
+    m_outstandingInterestId = 0;
+  }
+
+  sendSyncInterest();
+
+  if (static_cast<bool>(m_delayedInterestProcessingId))
+    m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+  m_delayedInterestProcessingId =
+    m_scheduler.scheduleEvent(m_cancelResetTimer,
+                              bind(&Logic::cancelReset, this));
+}
+
+void
+Logic::setUserPrefix(const Name& userPrefix)
+{
+  m_userPrefix = userPrefix;
+
+  m_sessionName = m_userPrefix;
+  m_sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
+
+  m_seqNo = 0;
+
+  reset();
+}
+
+void
+Logic::updateSeqNo(const SeqNo& seqNo)
+{
+  _LOG_DEBUG_ID(">> Logic::updateSeqNo");
+  _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << m_seqNo);
+  if (seqNo < m_seqNo || seqNo == 0)
+    return;
+
+  m_seqNo = seqNo;
+
+  _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << m_seqNo);
+
+  if (!m_isInReset) {
+    _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
+    ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
+    {
+      using namespace CryptoPP;
+
+      std::string hash;
+      StringSource(previousRoot->buf(), previousRoot->size(), true,
+                   new HexEncoder(new StringSink(hash), false));
+      _LOG_DEBUG_ID("Hash: " << hash);
+    }
+
+    bool isInserted = false;
+    bool isUpdated = false;
+    SeqNo oldSeq;
+    boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(m_sessionName, seqNo);
+
+    _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
+    _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
+    if (isInserted || isUpdated) {
+      DiffStatePtr commit = make_shared<DiffState>();
+      commit->update(m_sessionName, seqNo);
+      commit->setRootDigest(m_state.getRootDigest());
+      insertToDiffLog(commit, previousRoot);
+
+      satisfyPendingSyncInterests(commit);
+    }
+  }
+}
+
+ConstBufferPtr
+Logic::getRootDigest() const
+{
+  return m_state.getRootDigest();
+}
+
+void
+Logic::printState(std::ostream& os) const
+{
+  BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
+    {
+      os << *leaf << "\n";
+    }
+}
+
+std::set<Name>
+Logic::getSessionNames() const
+{
+  std::set<Name> sessionNames;
+
+  BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
+    {
+      sessionNames.insert(leaf->getSessionName());
+    }
+
+  return sessionNames;
+}
+
+void
+Logic::onSyncInterest(const Name& prefix, const Interest& interest)
+{
+  _LOG_DEBUG_ID(">> Logic::onSyncInterest");
+  Name name = interest.getName();
+
+  _LOG_DEBUG_ID("InterestName: " << name);
+
+  if (RESET_COMPONENT != name.get(-1)) {
+    // normal sync interest
+    processSyncInterest(interest.shared_from_this());
+  }
+  else
+    // reset interest
+    processResetInterest(interest);
+
+  _LOG_DEBUG_ID("<< Logic::onSyncInterest");
+}
+
+void
+Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
+{
+  //Sync prefix registration failed
+  _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
+}
+
+void
+Logic::onSyncData(const Interest& interest, Data& data)
+{
+  _LOG_DEBUG_ID(">> Logic::onSyncData");
+  // Place holder for validator.
+  onSyncDataValidated(data.shared_from_this());
+  _LOG_DEBUG_ID("<< Logic::onSyncData");
+}
+
+void
+Logic::onResetData(const Interest& interest, Data& data)
+{
+  // This should not happened, drop the received data.
+}
+
+void
+Logic::onSyncTimeout(const Interest& interest)
+{
+  // It is OK. Others will handle the time out situation.
+  _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
+  _LOG_DEBUG_ID("Interest: " << interest.getName());
+  _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
+}
+
+void
+Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
+{
+  // SyncReply cannot be validated.
+}
+
+void
+Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
+{
+  Name name = data->getName();
+  ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
+
+  processSyncData(name, digest, data->getContent().blockFromValue());
+}
+
+void
+Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
+                           bool isTimedProcessing/*=false*/)
+{
+  _LOG_DEBUG_ID(">> Logic::processSyncInterest");
+
+  const Name& name = interest->getName();
+  ConstBufferPtr digest =
+      make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
+
+  ConstBufferPtr rootDigest = m_state.getRootDigest();
+
+  // If the digest of the incoming interest is the same as root digest
+  // Put the interest into InterestTable
+  if (*rootDigest == *digest) {
+    _LOG_DEBUG_ID("Oh, we are in the same state");
+    m_interestTable.insert(interest, digest, false);
+
+    if (!m_isInReset)
+      return;
+
+    if (!isTimedProcessing) {
+      _LOG_DEBUG_ID("Non timed processing in reset");
+      // Still in reset, our own seq has not been put into state yet
+      // Do not hurry, some others may be also resetting and may send their reply
+      if (static_cast<bool>(m_delayedInterestProcessingId))
+        m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+      time::milliseconds after(m_rangeUniformRandom());
+      _LOG_DEBUG_ID("After: " << after);
+      m_delayedInterestProcessingId =
+        m_scheduler.scheduleEvent(after,
+                                  bind(&Logic::processSyncInterest, this, interest, true));
+    }
+    else {
+      _LOG_DEBUG_ID("Timed processing in reset");
+      // Now we can get out of reset state by putting our own stuff into m_state.
+      cancelReset();
+    }
+
+    return;
+  }
+
+  // If the digest of incoming interest is an "empty" digest
+  if (digest == EMPTY_DIGEST) {
+    _LOG_DEBUG_ID("Poor guy, he knows nothing");
+    sendSyncData(name, m_state);
+    return;
+  }
+
+  DiffStateContainer::iterator stateIter = m_log.find(digest);
+  // If the digest of incoming interest can be found from the log
+  if (stateIter != m_log.end()) {
+    _LOG_DEBUG_ID("It is ok, you are so close");
+    sendSyncData(name, *(*stateIter)->diff());
+    return;
+  }
+
+  if (!isTimedProcessing) {
+    _LOG_DEBUG_ID("Let's wait, just wait for a while");
+    // Do not hurry, some incoming SyncReplies may help us to recognize the digest
+    bool doesExist = m_interestTable.insert(interest, digest, true);
+    if (doesExist)
+      // Original comment (not sure): somebody else replied, so restart random-game timer
+      // YY: Get the same SyncInterest again, refresh the timer
+      m_scheduler.cancelEvent(m_delayedInterestProcessingId);
+
+    m_delayedInterestProcessingId =
+      m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
+                                bind(&Logic::processSyncInterest, this, interest, true));
+  }
+  else {
+    // OK, nobody is helping us, just tell the truth.
+    _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
+    m_interestTable.erase(digest);
+    sendSyncData(name, m_state);
+  }
+
+  _LOG_DEBUG_ID("<< Logic::processSyncInterest");
+}
+
+void
+Logic::processResetInterest(const Interest& interest)
+{
+  _LOG_DEBUG_ID(">> Logic::processResetInterest");
+  reset();
+}
+
+void
+Logic::processSyncData(const Name& name,
+                       ndn::ConstBufferPtr digest,
+                       const Block& syncReplyBlock)
+{
+  _LOG_DEBUG_ID(">> Logic::processSyncData");
+
+  DiffStatePtr commit = make_shared<DiffState>();
+  ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
+
+  try {
+    m_interestTable.erase(digest); // Remove satisfied interest from PIT
+
+    State reply;
+    reply.wireDecode(syncReplyBlock);
+
+    std::vector<MissingDataInfo> v;
+    BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
+      {
+        BOOST_ASSERT(leaf != 0);
+
+        const Name& info = leaf->getSessionName();
+        SeqNo seq = leaf->getSeq();
+
+        bool isInserted = false;
+        bool isUpdated = false;
+        SeqNo oldSeq;
+        boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
+
+        if (isInserted || isUpdated) {
+          commit->update(info, seq);
+
+          oldSeq++;
+          MissingDataInfo mdi = {info, oldSeq, seq};
+          v.push_back(mdi);
+        }
+      }
+
+    if (!v.empty()) {
+      m_onUpdate(v);
+
+      commit->setRootDigest(m_state.getRootDigest());
+      insertToDiffLog(commit, previousRoot);
+    }
+    else {
+      _LOG_DEBUG_ID("What? nothing new");
+    }
+  }
+  catch (State::Error&) {
+    _LOG_DEBUG_ID("Something really fishy happened during state decoding");
+    // Something really fishy happened during state decoding;
+    commit.reset();
+    return;
+  }
+
+  if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
+    // state changed and it is safe to express a new interest
+    time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
+    _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
+    EventId eventId = m_scheduler.scheduleEvent(after,
+                                                bind(&Logic::sendSyncInterest, this));
+
+    m_scheduler.cancelEvent(m_reexpressingInterestId);
+    m_reexpressingInterestId = eventId;
+  }
+}
+
+void
+Logic::satisfyPendingSyncInterests(ConstDiffStatePtr commit)
+{
+  _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
+  try {
+    _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
+    for (InterestTable::const_iterator it = m_interestTable.begin();
+         it != m_interestTable.end(); it++) {
+      ConstUnsatisfiedInterestPtr request = *it;
+
+      if (request->isUnknown)
+        sendSyncData(request->interest->getName(), m_state);
+      else
+        sendSyncData(request->interest->getName(), *commit);
+    }
+    m_interestTable.clear();
+  }
+  catch (InterestTable::Error&) {
+    // ok. not really an error
+  }
+  _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
+}
+
+void
+Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
+{
+  _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
+  // Connect to the history
+  if (!m_log.empty())
+    (*m_log.find(previousRoot))->setNext(commit);
+
+  // Insert the commit
+  m_log.erase(commit->getRootDigest());
+  m_log.insert(commit);
+  _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
+}
+
+void
+Logic::sendResetInterest()
+{
+  _LOG_DEBUG_ID(">> Logic::sendResetInterest");
+
+  if (m_needPeriodReset) {
+    _LOG_DEBUG_ID("Need Period Reset");
+    _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
+
+    EventId eventId =
+      m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
+                                bind(&Logic::sendResetInterest, this));
+    m_scheduler.cancelEvent(m_resetInterestId);
+    m_resetInterestId = eventId;
+  }
+
+  Interest interest(m_syncReset);
+  interest.setMustBeFresh(true);
+  interest.setInterestLifetime(m_resetInterestLifetime);
+  m_face.expressInterest(interest,
+                         bind(&Logic::onResetData, this, _1, _2),
+                         bind(&Logic::onSyncTimeout, this, _1));
+
+  _LOG_DEBUG_ID("<< Logic::sendResetInterest");
+}
+
+void
+Logic::sendSyncInterest()
+{
+  _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
+
+  Name interestName;
+  interestName.append(m_syncPrefix)
+    .append(ndn::name::Component(*m_state.getRootDigest()));
+
+  m_outstandingInterestName = interestName;
+
+#ifdef _DEBUG
+  printDigest(m_state.getRootDigest());
+#endif
+
+  EventId eventId =
+    m_scheduler.scheduleEvent(m_syncInterestLifetime +
+                              ndn::time::milliseconds(m_reexpressionJitter()),
+                              bind(&Logic::sendSyncInterest, this));
+  m_scheduler.cancelEvent(m_reexpressingInterestId);
+  m_reexpressingInterestId = eventId;
+
+  Interest interest(interestName);
+  interest.setMustBeFresh(true);
+  interest.setInterestLifetime(m_syncInterestLifetime);
+
+  m_outstandingInterestId = m_face.expressInterest(interest,
+                                                   bind(&Logic::onSyncData, this, _1, _2),
+                                                   bind(&Logic::onSyncTimeout, this, _1));
+
+  _LOG_DEBUG_ID("Send interest: " << interest.getName());
+  _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
+}
+
+void
+Logic::sendSyncData(const Name& name, const State& state)
+{
+  _LOG_DEBUG_ID(">> Logic::sendSyncData");
+  shared_ptr<Data> syncReply = make_shared<Data>(name);
+  syncReply->setContent(state.wireEncode());
+  syncReply->setFreshnessPeriod(m_syncReplyFreshness);
+  m_keyChain.sign(*syncReply);
+
+  m_face.put(*syncReply);
+
+  // checking if our own interest got satisfied
+  if (m_outstandingInterestName == name) {
+    // remove outstanding interest
+    if (m_outstandingInterestId != 0) {
+      m_face.removePendingInterest(m_outstandingInterestId);
+      m_outstandingInterestId = 0;
+    }
+
+    // re-schedule sending Sync interest
+    time::milliseconds after(m_reexpressionJitter());
+    _LOG_DEBUG_ID("Satisfy our own interest");
+    _LOG_DEBUG_ID("Reschedule sync interest after " << after);
+    EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
+    m_scheduler.cancelEvent(m_reexpressingInterestId);
+    m_reexpressingInterestId = eventId;
+  }
+  _LOG_DEBUG_ID("<< Logic::sendSyncData");
+}
+
+void
+Logic::cancelReset()
+{
+  _LOG_DEBUG_ID(">> Logic::cancelReset");
+  if (!m_isInReset)
+    return;
+
+  m_isInReset = false;
+  updateSeqNo(m_seqNo);
+  _LOG_DEBUG_ID("<< Logic::cancelReset");
+}
+
+void
+Logic::printDigest(ndn::ConstBufferPtr digest)
+{
+  using namespace CryptoPP;
+
+  std::string hash;
+  StringSource(digest->buf(), digest->size(), true,
+               new HexEncoder(new StringSink(hash), false));
+  _LOG_DEBUG_ID("Hash: " << hash);
+}
+
+} // namespace chronosync