blob: dd4d452857ddc93fa21c575e5810933504805605 [file] [log] [blame]
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2018, The University of Memphis
*
* This file is part of NLSR (Named-data Link State Routing).
* See AUTHORS.md for complete list of NLSR authors and contributors.
*
* NLSR is free software: you can redistribute it and/or modify it under the terms
* of the GNU General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
**/
#include "full-producer.hpp"
#include <ndn-cxx/util/logger.hpp>
#include <ndn-cxx/util/segment-fetcher.hpp>
#include <ndn-cxx/security/validator-null.hpp>
#include <iostream>
#include <cstring>
#include <limits>
#include <functional>
namespace psync {
NDN_LOG_INIT(psync.FullProducer);
FullProducer::FullProducer(const size_t expectedNumEntries,
ndn::Face& face,
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
const UpdateCallback& onUpdateCallBack,
ndn::time::milliseconds syncInterestLifetime,
ndn::time::milliseconds syncReplyFreshness)
: ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
, m_syncInterestLifetime(syncInterestLifetime)
, m_onUpdate(onUpdateCallBack)
, m_outstandingInterestId(nullptr)
, m_scheduledSyncInterestId(m_scheduler)
{
int jitter = m_syncInterestLifetime.count() * .20;
m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
m_registerPrefixId =
m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
std::bind(&FullProducer::onInterest, this, _1, _2),
std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
// Should we do this after setInterestFilter success call back
// (Currently following ChronoSync's way)
sendSyncInterest();
}
FullProducer::~FullProducer()
{
if (m_outstandingInterestId != nullptr) {
m_face.removePendingInterest(m_outstandingInterestId);
}
m_face.unsetInterestFilter(m_registerPrefixId);
}
void
FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
{
if (m_prefixes.find(prefix) == m_prefixes.end()) {
NDN_LOG_WARN("Prefix not added: " << prefix);
return;
}
uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
updateSeqNo(prefix, newSeq);
satisfyPendingInterests();
}
void
FullProducer::sendSyncInterest()
{
// If we send two sync interest one after the other
// since there is no new data in the network yet,
// when data is available it may satisfy both of them
if (m_outstandingInterestId != nullptr) {
m_face.removePendingInterest(m_outstandingInterestId);
}
// Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
ndn::Name syncInterestName = m_syncPrefix;
// Append our latest IBF
m_iblt.appendToName(syncInterestName);
m_outstandingInterestName = syncInterestName;
m_scheduledSyncInterestId =
m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
[this] { sendSyncInterest(); });
ndn::Interest syncInterest(syncInterestName);
syncInterest.setInterestLifetime(m_syncInterestLifetime);
// Other side appends hash of IBF to sync data name
syncInterest.setCanBePrefix(true);
syncInterest.setMustBeFresh(true);
syncInterest.setNonce(1);
syncInterest.refreshNonce();
m_outstandingInterestId = m_face.expressInterest(syncInterest,
std::bind(&FullProducer::onSyncData, this, _1, _2),
[] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
" for Interest with Nonce: " << interest.getNonce());
},
[] (const ndn::Interest& interest) {
NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
});
NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
", hash: " << std::hash<ndn::Name>{}(syncInterestName));
}
void
FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
{
ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
if (nameWithoutSyncPrefix.size() == 2 &&
nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
onRecoveryInterest(interest);
}
else if (nameWithoutSyncPrefix.size() == 1) {
onSyncInterest(interest);
}
}
void
FullProducer::onSyncInterest(const ndn::Interest& interest)
{
ndn::Name interestName = interest.getName();
ndn::name::Component ibltName = interestName.get(interestName.size()-1);
NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
", hash: " << std::hash<ndn::Name>{}(interestName));
IBLT iblt(m_expectedNumEntries);
try {
iblt.initialize(ibltName);
}
catch (const std::exception& e) {
NDN_LOG_WARN(e.what());
return;
}
IBLT diff = m_iblt - iblt;
std::set<uint32_t> positive;
std::set<uint32_t> negative;
if (!diff.listEntries(positive, negative)) {
NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
<< " negative: " << negative.size() << " m_threshold: "
<< m_threshold);
// Send nack if greater then threshold, else send positive below as usual
// Or send if we can't get neither positive nor negative differences
if (positive.size() + negative.size() >= m_threshold ||
(positive.size() == 0 && negative.size() == 0)) {
// If we don't have anything to offer means that
// we are behind and should not mislead other nodes.
bool haveSomethingToOffer = false;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
haveSomethingToOffer = true;
}
}
if (haveSomethingToOffer) {
sendApplicationNack(interestName);
}
return;
}
}
State state;
for (const auto& hash : positive) {
ndn::Name prefix = m_hash2prefix[hash];
// Don't sync up sequence number zero
if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
}
}
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Sending sync content: " << state);
sendSyncData(interestName, state.wireEncode());
return;
}
ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
auto it = m_pendingEntries.emplace(interestName,
PendingEntryInfoFull{iblt, std::move(scopedEventId)});
it.first->second.expirationEvent =
m_scheduler.scheduleEvent(interest.getInterestLifetime(),
[this, interest] {
NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
m_pendingEntries.erase(interest.getName());
});
}
void
FullProducer::onRecoveryInterest(const ndn::Interest& interest)
{
NDN_LOG_DEBUG("Recovery interest received");
State state;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
state.addContent(ndn::Name(content.first).appendNumber(content.second));
}
}
// Send even if state is empty to let other side know that we are behind
sendRecoveryData(interest.getName(), state);
}
void
FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
{
NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
ndn::Name nameWithIblt;
m_iblt.appendToName(nameWithIblt);
// Append hash of our IBF so that data name maybe different for each node answering
ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
data.setFreshnessPeriod(m_syncReplyFreshness);
data.setContent(block);
m_keyChain.sign(data);
// checking if our own interest got satisfied
if (m_outstandingInterestName == name) {
NDN_LOG_DEBUG("Satisfied our own pending interest");
// remove outstanding interest
if (m_outstandingInterestId != nullptr) {
NDN_LOG_DEBUG("Removing our pending interest from face");
m_face.removePendingInterest(m_outstandingInterestId);
m_outstandingInterestId = nullptr;
m_outstandingInterestName = ndn::Name("");
}
NDN_LOG_DEBUG("Sending Sync Data");
// Send data after removing pending sync interest on face
m_face.put(data);
NDN_LOG_TRACE("Renewing sync interest");
sendSyncInterest();
}
else {
NDN_LOG_DEBUG("Sending Sync Data");
m_face.put(data);
}
}
void
FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
{
ndn::Name interestName = interest.getName();
deletePendingInterests(interest.getName());
if (data.getContentType() == ndn::tlv::ContentType_Nack) {
NDN_LOG_DEBUG("Got application nack, sending recovery interest");
sendRecoveryInterest(interest);
return;
}
State state(data.getContent());
std::vector<MissingDataInfo> updates;
if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
state.getContent().empty()) {
NDN_LOG_TRACE("Recovery data is empty, other side is behind");
return;
}
NDN_LOG_DEBUG("Sync Data Received: " << state);
for (const auto& content : state.getContent()) {
ndn::Name prefix = content.getPrefix(-1);
uint64_t seq = content.get(content.size()-1).toNumber();
if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
updateSeqNo(prefix, seq);
// We should not call satisfyPendingSyncInterests here because we just
// got data and deleted pending interest by calling deletePendingFullSyncInterests
// But we might have interests not matching to this interest that might not have deleted
// from pending sync interest
}
}
// We just got the data, so send a new sync interest
if (!updates.empty()) {
m_onUpdate(updates);
NDN_LOG_TRACE("Renewing sync interest");
sendSyncInterest();
}
else {
NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
" , hash: " << std::hash<ndn::Name>{}(interestName));
}
}
void
FullProducer::satisfyPendingInterests()
{
NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
const PendingEntryInfoFull& entry = it->second;
IBLT diff = m_iblt - entry.iblt;
std::set<uint32_t> positive;
std::set<uint32_t> negative;
if (!diff.listEntries(positive, negative)) {
NDN_LOG_TRACE("Decode failed for pending interest");
if (positive.size() + negative.size() >= m_threshold ||
(positive.size() == 0 && negative.size() == 0)) {
NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
m_pendingEntries.erase(it++);
continue;
}
}
State state;
for (const auto& hash : positive) {
ndn::Name prefix = m_hash2prefix[hash];
if (m_prefixes[prefix] != 0) {
state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
}
}
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Satisfying sync content: " << state);
sendSyncData(it->first, state.wireEncode());
m_pendingEntries.erase(it++);
}
else {
++it;
}
}
}
bool
FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
{
uint32_t nextHash = murmurHash3(N_HASHCHECK,
ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
for (const auto& nHash : negative) {
if (nHash == nextHash) {
return true;
break;
}
}
return false;
}
void
FullProducer::deletePendingInterests(const ndn::Name& interestName) {
for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
if (it->first == interestName) {
NDN_LOG_TRACE("Delete pending interest: " << interestName);
m_pendingEntries.erase(it++);
}
else {
++it;
}
}
}
void
FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
{
ndn::EncodingBuffer buffer;
buffer.prependBlock(state.wireEncode());
const uint8_t* rawBuffer = buffer.buf();
const uint8_t* segmentBegin = rawBuffer;
const uint8_t* end = rawBuffer + buffer.size();
uint64_t segmentNo = 0;
do {
const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
if (segmentEnd > end) {
segmentEnd = end;
}
ndn::Name segmentName(prefix);
segmentName.appendSegment(segmentNo);
std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
data->setContent(segmentBegin, segmentEnd - segmentBegin);
data->setFreshnessPeriod(m_syncReplyFreshness);
segmentBegin = segmentEnd;
if (segmentBegin >= end) {
data->setFinalBlock(segmentName[-1]);
}
m_keyChain.sign(*data);
m_face.put(*data);
NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
++segmentNo;
} while (segmentBegin < end);
}
void
FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
{
if (m_outstandingInterestId != nullptr) {
m_face.removePendingInterest(m_outstandingInterestId);
m_outstandingInterestId = nullptr;
}
ndn::Name ibltName;
m_iblt.appendToName(ibltName);
ndn::Name recoveryInterestName(m_syncPrefix);
recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
recoveryInterestName.append(RECOVERY_PREFIX);
ndn::Interest recoveryInterest(recoveryInterestName);
recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
auto fetcher = ndn::util::SegmentFetcher::start(m_face,
recoveryInterest,
ndn::security::v2::getAcceptAllValidator());
fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
NDN_LOG_TRACE("Segment fetcher got data");
ndn::Data data;
data.setContent(std::move(bufferPtr));
onSyncData(recoveryInterest, data);
});
fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
" message: " << msg);
});
}
} // namespace psync