PSync: initial commit

refs: #4641

Change-Id: Iabed3ad7632544d97559e6798547b7972b416784
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
new file mode 100644
index 0000000..dd4d452
--- /dev/null
+++ b/src/full-producer.cpp
@@ -0,0 +1,466 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  The University of Memphis
+ *
+ * This file is part of NLSR (Named-data Link State Routing).
+ * See AUTHORS.md for complete list of NLSR authors and contributors.
+ *
+ * NLSR is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NLSR, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "full-producer.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
+
+#include <iostream>
+#include <cstring>
+#include <limits>
+#include <functional>
+
+namespace psync {
+
+NDN_LOG_INIT(psync.FullProducer);
+
+FullProducer::FullProducer(const size_t expectedNumEntries,
+                           ndn::Face& face,
+                           const ndn::Name& syncPrefix,
+                           const ndn::Name& userPrefix,
+                           const UpdateCallback& onUpdateCallBack,
+                           ndn::time::milliseconds syncInterestLifetime,
+                           ndn::time::milliseconds syncReplyFreshness)
+  : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
+  , m_syncInterestLifetime(syncInterestLifetime)
+  , m_onUpdate(onUpdateCallBack)
+  , m_outstandingInterestId(nullptr)
+  , m_scheduledSyncInterestId(m_scheduler)
+{
+  int jitter = m_syncInterestLifetime.count() * .20;
+  m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
+
+  m_registerPrefixId =
+    m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
+                             std::bind(&FullProducer::onInterest, this, _1, _2),
+                             std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
+
+  // Should we do this after setInterestFilter success call back
+  // (Currently following ChronoSync's way)
+  sendSyncInterest();
+}
+
+FullProducer::~FullProducer()
+{
+  if (m_outstandingInterestId != nullptr) {
+    m_face.removePendingInterest(m_outstandingInterestId);
+  }
+
+  m_face.unsetInterestFilter(m_registerPrefixId);
+}
+
+void
+FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
+{
+  if (m_prefixes.find(prefix) == m_prefixes.end()) {
+    NDN_LOG_WARN("Prefix not added: " << prefix);
+    return;
+  }
+
+  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
+
+  NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
+
+  updateSeqNo(prefix, newSeq);
+
+  satisfyPendingInterests();
+}
+
+void
+FullProducer::sendSyncInterest()
+{
+  // If we send two sync interest one after the other
+  // since there is no new data in the network yet,
+  // when data is available it may satisfy both of them
+  if (m_outstandingInterestId != nullptr) {
+    m_face.removePendingInterest(m_outstandingInterestId);
+  }
+
+  // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
+  ndn::Name syncInterestName = m_syncPrefix;
+
+  // Append our latest IBF
+  m_iblt.appendToName(syncInterestName);
+
+  m_outstandingInterestName = syncInterestName;
+
+  m_scheduledSyncInterestId =
+    m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
+                              [this] { sendSyncInterest(); });
+
+  ndn::Interest syncInterest(syncInterestName);
+  syncInterest.setInterestLifetime(m_syncInterestLifetime);
+  // Other side appends hash of IBF to sync data name
+  syncInterest.setCanBePrefix(true);
+  syncInterest.setMustBeFresh(true);
+
+  syncInterest.setNonce(1);
+  syncInterest.refreshNonce();
+
+  m_outstandingInterestId = m_face.expressInterest(syncInterest,
+                              std::bind(&FullProducer::onSyncData, this, _1, _2),
+                              [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
+                                NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
+                                              " for Interest with Nonce: " << interest.getNonce());
+                              },
+                              [] (const ndn::Interest& interest) {
+                                NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
+                              });
+
+  NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
+                ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
+}
+
+void
+FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
+{
+  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
+  if (nameWithoutSyncPrefix.size() == 2 &&
+      nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
+      onRecoveryInterest(interest);
+  }
+  else if (nameWithoutSyncPrefix.size() == 1) {
+    onSyncInterest(interest);
+  }
+}
+
+void
+FullProducer::onSyncInterest(const ndn::Interest& interest)
+{
+  ndn::Name interestName = interest.getName();
+  ndn::name::Component ibltName = interestName.get(interestName.size()-1);
+
+  NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
+                ", hash: " << std::hash<ndn::Name>{}(interestName));
+
+  IBLT iblt(m_expectedNumEntries);
+  try {
+    iblt.initialize(ibltName);
+  }
+  catch (const std::exception& e) {
+    NDN_LOG_WARN(e.what());
+    return;
+  }
+
+  IBLT diff = m_iblt - iblt;
+
+  std::set<uint32_t> positive;
+  std::set<uint32_t> negative;
+
+  if (!diff.listEntries(positive, negative)) {
+    NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
+                   << " negative: " << negative.size() << " m_threshold: "
+                   << m_threshold);
+
+    // Send nack if greater then threshold, else send positive below as usual
+    // Or send if we can't get neither positive nor negative differences
+    if (positive.size() + negative.size() >= m_threshold ||
+        (positive.size() == 0 && negative.size() == 0)) {
+
+      // If we don't have anything to offer means that
+      // we are behind and should not mislead other nodes.
+      bool haveSomethingToOffer = false;
+      for (const auto& content : m_prefixes) {
+        if (content.second != 0) {
+          haveSomethingToOffer = true;
+        }
+      }
+
+      if (haveSomethingToOffer) {
+        sendApplicationNack(interestName);
+      }
+      return;
+    }
+  }
+
+  State state;
+  for (const auto& hash : positive) {
+    ndn::Name prefix = m_hash2prefix[hash];
+    // Don't sync up sequence number zero
+    if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
+      state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+    }
+  }
+
+  if (!state.getContent().empty()) {
+    NDN_LOG_DEBUG("Sending sync content: " << state);
+    sendSyncData(interestName, state.wireEncode());
+    return;
+  }
+
+  ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
+  auto it = m_pendingEntries.emplace(interestName,
+                                     PendingEntryInfoFull{iblt, std::move(scopedEventId)});
+
+  it.first->second.expirationEvent =
+    m_scheduler.scheduleEvent(interest.getInterestLifetime(),
+                              [this, interest] {
+                                NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
+                                m_pendingEntries.erase(interest.getName());
+                              });
+}
+
+void
+FullProducer::onRecoveryInterest(const ndn::Interest& interest)
+{
+  NDN_LOG_DEBUG("Recovery interest received");
+
+  State state;
+  for (const auto& content : m_prefixes) {
+    if (content.second != 0) {
+      state.addContent(ndn::Name(content.first).appendNumber(content.second));
+    }
+  }
+
+  // Send even if state is empty to let other side know that we are behind
+  sendRecoveryData(interest.getName(), state);
+}
+
+void
+FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
+{
+  NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
+
+  ndn::Name nameWithIblt;
+  m_iblt.appendToName(nameWithIblt);
+
+  // Append hash of our IBF so that data name maybe different for each node answering
+  ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
+  data.setFreshnessPeriod(m_syncReplyFreshness);
+  data.setContent(block);
+  m_keyChain.sign(data);
+
+  // checking if our own interest got satisfied
+  if (m_outstandingInterestName == name) {
+    NDN_LOG_DEBUG("Satisfied our own pending interest");
+    // remove outstanding interest
+    if (m_outstandingInterestId != nullptr) {
+      NDN_LOG_DEBUG("Removing our pending interest from face");
+      m_face.removePendingInterest(m_outstandingInterestId);
+      m_outstandingInterestId = nullptr;
+      m_outstandingInterestName = ndn::Name("");
+    }
+
+    NDN_LOG_DEBUG("Sending Sync Data");
+
+    // Send data after removing pending sync interest on face
+    m_face.put(data);
+
+    NDN_LOG_TRACE("Renewing sync interest");
+    sendSyncInterest();
+  }
+  else {
+    NDN_LOG_DEBUG("Sending Sync Data");
+    m_face.put(data);
+  }
+}
+
+void
+FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+{
+  ndn::Name interestName = interest.getName();
+  deletePendingInterests(interest.getName());
+
+  if (data.getContentType() == ndn::tlv::ContentType_Nack) {
+    NDN_LOG_DEBUG("Got application nack, sending recovery interest");
+    sendRecoveryInterest(interest);
+    return;
+  }
+
+  State state(data.getContent());
+  std::vector<MissingDataInfo> updates;
+
+  if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
+      state.getContent().empty()) {
+    NDN_LOG_TRACE("Recovery data is empty, other side is behind");
+    return;
+  }
+
+  NDN_LOG_DEBUG("Sync Data Received:  " << state);
+
+  for (const auto& content : state.getContent()) {
+    ndn::Name prefix = content.getPrefix(-1);
+    uint64_t seq = content.get(content.size()-1).toNumber();
+
+    if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
+      updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
+      updateSeqNo(prefix, seq);
+      // We should not call satisfyPendingSyncInterests here because we just
+      // got data and deleted pending interest by calling deletePendingFullSyncInterests
+      // But we might have interests not matching to this interest that might not have deleted
+      // from pending sync interest
+    }
+  }
+
+  // We just got the data, so send a new sync interest
+  if (!updates.empty()) {
+    m_onUpdate(updates);
+    NDN_LOG_TRACE("Renewing sync interest");
+    sendSyncInterest();
+  }
+  else {
+    NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce()  <<
+                  " , hash: " << std::hash<ndn::Name>{}(interestName));
+  }
+}
+
+void
+FullProducer::satisfyPendingInterests()
+{
+  NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
+
+  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+    const PendingEntryInfoFull& entry = it->second;
+    IBLT diff = m_iblt - entry.iblt;
+    std::set<uint32_t> positive;
+    std::set<uint32_t> negative;
+
+    if (!diff.listEntries(positive, negative)) {
+      NDN_LOG_TRACE("Decode failed for pending interest");
+      if (positive.size() + negative.size() >= m_threshold ||
+          (positive.size() == 0 && negative.size() == 0)) {
+        NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
+        m_pendingEntries.erase(it++);
+        continue;
+      }
+    }
+
+    State state;
+    for (const auto& hash : positive) {
+      ndn::Name prefix = m_hash2prefix[hash];
+
+      if (m_prefixes[prefix] != 0) {
+        state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+      }
+    }
+
+    if (!state.getContent().empty()) {
+      NDN_LOG_DEBUG("Satisfying sync content: " << state);
+      sendSyncData(it->first, state.wireEncode());
+      m_pendingEntries.erase(it++);
+    }
+    else {
+      ++it;
+    }
+  }
+}
+
+bool
+FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
+{
+  uint32_t nextHash = murmurHash3(N_HASHCHECK,
+                                  ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
+  for (const auto& nHash : negative) {
+    if (nHash == nextHash) {
+      return true;
+      break;
+    }
+  }
+  return false;
+}
+
+void
+FullProducer::deletePendingInterests(const ndn::Name& interestName) {
+  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+    if (it->first == interestName) {
+      NDN_LOG_TRACE("Delete pending interest: " << interestName);
+      m_pendingEntries.erase(it++);
+    }
+    else {
+      ++it;
+    }
+  }
+}
+
+void
+FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
+{
+  ndn::EncodingBuffer buffer;
+  buffer.prependBlock(state.wireEncode());
+
+  const uint8_t* rawBuffer = buffer.buf();
+  const uint8_t* segmentBegin = rawBuffer;
+  const uint8_t* end = rawBuffer + buffer.size();
+
+  uint64_t segmentNo = 0;
+  do {
+    const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
+    if (segmentEnd > end) {
+      segmentEnd = end;
+    }
+
+    ndn::Name segmentName(prefix);
+    segmentName.appendSegment(segmentNo);
+
+    std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+    data->setContent(segmentBegin, segmentEnd - segmentBegin);
+    data->setFreshnessPeriod(m_syncReplyFreshness);
+
+    segmentBegin = segmentEnd;
+    if (segmentBegin >= end) {
+      data->setFinalBlock(segmentName[-1]);
+    }
+
+    m_keyChain.sign(*data);
+    m_face.put(*data);
+
+    NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
+
+    ++segmentNo;
+  } while (segmentBegin < end);
+}
+
+void
+FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
+{
+  if (m_outstandingInterestId != nullptr) {
+    m_face.removePendingInterest(m_outstandingInterestId);
+    m_outstandingInterestId = nullptr;
+  }
+
+  ndn::Name ibltName;
+  m_iblt.appendToName(ibltName);
+
+  ndn::Name recoveryInterestName(m_syncPrefix);
+  recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
+  recoveryInterestName.append(RECOVERY_PREFIX);
+
+  ndn::Interest recoveryInterest(recoveryInterestName);
+  recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
+
+  auto fetcher = ndn::util::SegmentFetcher::start(m_face,
+                                                  recoveryInterest,
+                                                  ndn::security::v2::getAcceptAllValidator());
+
+  fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
+                                NDN_LOG_TRACE("Segment fetcher got data");
+                                ndn::Data data;
+                                data.setContent(std::move(bufferPtr));
+                                onSyncData(recoveryInterest, data);
+                              });
+
+  fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
+                             NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
+                                           " message: " << msg);
+                           });
+}
+
+} // namespace psync
\ No newline at end of file