blob: 5073ab590502d3a1e9d245a266eb6c841ef6d725 [file] [log] [blame]
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
* Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
*
* PSync is free software: you can redistribute it and/or modify it under the terms
* of the GNU Lesser General Public License as published by the Free Software Foundation,
* either version 3 of the License, or (at your option) any later version.
*
* PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
* without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
* PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
*/
#include "PSync/full-producer.hpp"
#include "PSync/detail/state.hpp"
#include "PSync/detail/util.hpp"
#include <ndn-cxx/lp/tags.hpp>
#include <ndn-cxx/security/validator-null.hpp>
#include <ndn-cxx/util/logger.hpp>
#include <cstring>
namespace psync {
NDN_LOG_INIT(psync.FullProducer);
FullProducer::FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
const ndn::Name& syncPrefix,
const Options& opts)
: ProducerBase(face, keyChain, opts.ibfCount, syncPrefix, opts.syncDataFreshness,
opts.ibfCompression, opts.contentCompression)
, m_syncInterestLifetime(opts.syncInterestLifetime)
, m_onUpdate(opts.onUpdate)
{
m_registeredPrefix = m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
[this] (auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
[] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });
// Should we do this after setInterestFilter success call back
// (Currently following ChronoSync's way)
sendSyncInterest();
}
FullProducer::FullProducer(ndn::Face& face,
ndn::KeyChain& keyChain,
size_t expectedNumEntries,
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
UpdateCallback onUpdateCb,
ndn::time::milliseconds syncInterestLifetime,
ndn::time::milliseconds syncReplyFreshness,
CompressionScheme ibltCompression,
CompressionScheme contentCompression)
: FullProducer(face, keyChain, syncPrefix,
Options{std::move(onUpdateCb), static_cast<uint32_t>(expectedNumEntries), ibltCompression,
syncInterestLifetime, syncReplyFreshness, contentCompression})
{
addUserNode(userPrefix);
}
FullProducer::~FullProducer()
{
if (m_fetcher) {
m_fetcher->stop();
}
}
void
FullProducer::publishName(const ndn::Name& prefix, std::optional<uint64_t> seq)
{
if (m_prefixes.find(prefix) == m_prefixes.end()) {
NDN_LOG_WARN("Prefix not added: " << prefix);
return;
}
uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
updateSeqNo(prefix, newSeq);
m_inNoNewDataWaitOutPeriod = false;
satisfyPendingInterests(ndn::Name(prefix).appendNumber(newSeq));
}
void
FullProducer::sendSyncInterest()
{
if (m_inNoNewDataWaitOutPeriod) {
NDN_LOG_TRACE("Cannot send sync Interest as Data is expected from CS");
return;
}
// If we send two sync interest one after the other
// since there is no new data in the network yet,
// when data is available it may satisfy both of them
if (m_fetcher) {
m_fetcher->stop();
}
// Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
ndn::Name syncInterestName = m_syncPrefix;
// Append our latest IBF
m_iblt.appendToName(syncInterestName);
// Append cumulative updates that has been inserted into this IBF
syncInterestName.appendNumber(m_numOwnElements);
auto currentTime = ndn::time::system_clock::now();
if ((currentTime - m_lastInterestSentTime < ndn::time::milliseconds(MIN_JITTER)) &&
(m_outstandingInterestName == syncInterestName)) {
NDN_LOG_TRACE("Suppressing Interest: " << std::hash<ndn::Name>{}(syncInterestName));
return;
}
m_outstandingInterestName = syncInterestName;
m_scheduledSyncInterestId =
m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
[this] { sendSyncInterest(); });
ndn::Interest syncInterest(syncInterestName);
using ndn::SegmentFetcher;
SegmentFetcher::Options options;
options.interestLifetime = m_syncInterestLifetime;
options.maxTimeout = m_syncInterestLifetime;
options.rttOptions.initialRto = m_syncInterestLifetime;
// This log message must be before sending the Interest through SegmentFetcher
// because getNonce generates a Nonce for this Interest.
// SegmentFetcher makes a copy of this Interest, so if we print the Nonce
// after, that Nonce will be different than the one seen in tshark!
NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
", hash: " << std::hash<ndn::Name>{}(syncInterestName));
m_lastInterestSentTime = currentTime;
m_fetcher = SegmentFetcher::start(m_face, syncInterest,
ndn::security::getAcceptAllValidator(), options);
m_fetcher->onComplete.connect([this, syncInterest] (const ndn::ConstBufferPtr& bufferPtr) {
onSyncData(syncInterest, bufferPtr);
});
m_fetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
auto tag = data.getTag<ndn::lp::IncomingFaceIdTag>();
if (tag) {
m_incomingFace = *tag;
}
else {
m_incomingFace = 0;
}
});
m_fetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
NDN_LOG_ERROR("Cannot fetch sync data, error: " << errorCode << ", message: " << msg);
// We would like to recover from errors like NoRoute NACK quicker than sync Interest timeout.
// We don't react to Interest timeout here as we have scheduled the next sync Interest
// to be sent in half the sync Interest lifetime + jitter above. So we would react to
// timeout before it happens.
if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
auto after = ndn::time::milliseconds(m_jitter(m_rng));
NDN_LOG_DEBUG("Schedule sync Interest after: " << after);
m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); });
}
});
}
void
FullProducer::processWaitingInterests()
{
NDN_LOG_TRACE("Processing waiting Interest list, size: " << m_waitingForProcessing.size());
if (m_waitingForProcessing.size() == 0) {
return;
}
for (auto it = m_waitingForProcessing.begin(); it != m_waitingForProcessing.end();) {
if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
NDN_LOG_TRACE("Interest with hash already marked for deletion, removing now: " <<
std::hash<ndn::Name>{}(it->first));
it = m_waitingForProcessing.erase(it);
continue;
}
it->second.numTries += 1;
ndn::Interest interest(it->first);
interest.setNonce(it->second.nonce);
onSyncInterest(m_syncPrefix, interest, true);
if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
NDN_LOG_TRACE("Removing Interest with hash: " << std::hash<ndn::Name>{}(it->first));
it = m_waitingForProcessing.erase(it);
}
else {
++it;
}
}
NDN_LOG_TRACE("Done processing waiting Interest list, size: " << m_waitingForProcessing.size());
}
void
FullProducer::scheduleProcessWaitingInterests()
{
// If nothing waiting, no need to schedule
if (m_waitingForProcessing.size() == 0) {
return;
}
if (!m_interestDelayTimerId) {
auto after = ndn::time::milliseconds(m_jitter(m_rng));
NDN_LOG_TRACE("Setting a timer to processes waiting Interest(s) in: " << after);
m_interestDelayTimerId = m_scheduler.schedule(after, [=] {
NDN_LOG_TRACE("Timer has expired, trying to process waiting Interest(s)");
processWaitingInterests();
scheduleProcessWaitingInterests();
});
}
}
void
FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest,
bool isTimedProcessing)
{
ndn::Name interestName = interest.getName();
auto interestNameHash = std::hash<ndn::Name>{}(interestName);
NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() <<
", hash: " << interestNameHash);
if (isTimedProcessing) {
NDN_LOG_TRACE("Delayed Interest being processed now");
}
if (m_segmentPublisher.replyFromStore(interestName)) {
NDN_LOG_DEBUG("Answer from memory");
return;
}
ndn::Name nameWithoutSyncPrefix = interestName.getSubName(prefixName.size());
if (nameWithoutSyncPrefix.size() == 4) {
// /<IBF>/<numCumulativeElements>/<version>/<segment>
NDN_LOG_DEBUG("Segment not found in memory. Other side will have to restart");
// This should have been answered from publisher Cache!
sendApplicationNack(prefixName);
return;
}
if (nameWithoutSyncPrefix.size() != 2) {
NDN_LOG_WARN("Two components required after sync prefix: /<IBF>/<numCumulativeElements>; received: " << interestName);
return;
}
ndn::name::Component ibltName = interestName[-2];
uint64_t numRcvdElements = interestName[-1].toNumber();
detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
try {
iblt.initialize(ibltName);
}
catch (const std::exception& e) {
NDN_LOG_WARN(e.what());
return;
}
auto diff = m_iblt - iblt;
NDN_LOG_TRACE("Decode, positive: " << diff.positive.size()
<< " negative: " << diff.negative.size() << " m_threshold: "
<< m_threshold);
auto waitingIt = m_waitingForProcessing.find(interestName);
if (!diff.canDecode) {
NDN_LOG_DEBUG("Cannot decode differences!");
if (numRcvdElements > m_numOwnElements) {
if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
NDN_LOG_TRACE("Decode failure, adding to waiting Interest list " << interestNameHash);
m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
scheduleProcessWaitingInterests();
}
else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
if (waitingIt->second.numTries > 1) {
NDN_LOG_TRACE("Decode failure, still behind. Erasing waiting Interest as we have tried twice");
waitingIt->second.numTries = std::numeric_limits<uint16_t>::max(); // markWaitingInterestForDeletion
NDN_LOG_DEBUG("Waiting Interest has been deleted. Sending new sync interest");
sendSyncInterest();
}
else {
NDN_LOG_TRACE("Decode failure, still behind, waiting more till the next timer");
}
}
else {
NDN_LOG_TRACE("Decode failure, still behind");
}
}
else {
if (m_numOwnElements == numRcvdElements && diff.positive.size() == 0 && diff.negative.size() > 0) {
NDN_LOG_TRACE("We have nothing to offer and are actually behind");
#ifdef PSYNC_WITH_TESTS
++nIbfDecodeFailuresBelowThreshold;
#endif // PSYNC_WITH_TESTS
return;
}
detail::State state;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
state.addContent(ndn::Name(content.first).appendNumber(content.second));
}
}
#ifdef PSYNC_WITH_TESTS
++nIbfDecodeFailuresAboveThreshold;
#endif // PSYNC_WITH_TESTS
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Sending entire state: " << state);
// Want low freshness when potentially sending large content to clear it quickly from the network
sendSyncData(interestName, state.wireEncode(), 10_ms);
// Since we're directly sending the data, we need to clear pending interests here
deletePendingInterests(interestName);
}
// We seem to be ahead, delete the Interest from waiting list
if (waitingIt != m_waitingForProcessing.end()) {
waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
}
}
return;
}
if (diff.positive.size() == 0 && diff.negative.size() == 0) {
NDN_LOG_TRACE("Saving positive: " << diff.positive.size() << " negative: " << diff.negative.size());
auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
[this, interest] {
NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce());
m_pendingEntries.erase(interest.getName());
});
// Can't delete directly in this case as it will cause
// memory access errors with the for loop in processWaitingInterests
if (isTimedProcessing) {
if (waitingIt != m_waitingForProcessing.end()) {
waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
}
}
return;
}
// Only add to waiting list if we don't have anything to send (positive = 0)
if (diff.positive.size() == 0 && diff.negative.size() > 0) {
if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
NDN_LOG_TRACE("Adding Interest to waiting list: " << interestNameHash);
m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
scheduleProcessWaitingInterests();
}
else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
if (waitingIt->second.numTries > 1) {
NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash <<
". Erasing waiting Interest as we have tried twice");
waitingIt->second.numTries = std::numeric_limits<uint16_t>::max(); // markWaitingInterestForDeletion
}
else {
NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash <<
". Keep waiting for Interest as number of tries is not exhausted");
}
}
else {
NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash);
}
return;
}
if (diff.positive.size() > 0) {
detail::State state;
for (const auto& hash : diff.positive) {
auto nameIt = m_biMap.left.find(hash);
if (nameIt != m_biMap.left.end()) {
ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
// Don't sync up sequence number zero
if (m_prefixes[nameWithoutSeq] != 0 &&
!isFutureHash(nameWithoutSeq.toUri(), diff.negative)) {
state.addContent(nameIt->second);
}
}
}
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Sending sync content: " << state);
sendSyncData(interestName, state.wireEncode(), m_syncReplyFreshness);
// Timed processing or not - if we are answering it, it should not go in waiting Interests
if (waitingIt != m_waitingForProcessing.end()) {
waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
}
}
}
}
void
FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block,
ndn::time::milliseconds syncReplyFreshness)
{
bool isSatisfyingOwnInterest = m_outstandingInterestName == name;
if (isSatisfyingOwnInterest && m_fetcher) {
NDN_LOG_DEBUG("Removing our pending Interest from face (stop fetcher)");
m_fetcher->stop();
m_outstandingInterestName.clear();
}
NDN_LOG_DEBUG("Sending sync Data");
auto content = detail::compress(m_contentCompression, block);
m_segmentPublisher.publish(name, name, *content, syncReplyFreshness);
if (isSatisfyingOwnInterest) {
NDN_LOG_DEBUG("Renewing sync interest");
sendSyncInterest();
}
}
void
FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
{
deletePendingInterests(interest.getName());
detail::State state;
try {
auto decompressed = detail::decompress(m_contentCompression, *bufferPtr);
state.wireDecode(ndn::Block(std::move(decompressed)));
}
catch (const std::exception& e) {
NDN_LOG_ERROR("Cannot parse received sync Data: " << e.what());
return;
}
NDN_LOG_DEBUG("Sync Data received: " << state);
std::vector<MissingDataInfo> updates;
for (const auto& content : state) {
ndn::Name prefix = content.getPrefix(-1);
uint64_t seq = content.get(content.size() - 1).toNumber();
if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
updates.push_back({prefix, m_prefixes[prefix] + 1, seq, m_incomingFace});
updateSeqNo(prefix, seq);
// We should not call satisfyPendingSyncInterests here because we just
// got data and deleted pending interest by calling deletePendingFullSyncInterests
// But we might have interests not matching to this interest that might not have deleted
// from pending sync interest
}
}
if (!updates.empty()) {
m_onUpdate(updates);
// Wait a bit to let neighbors get the data too
auto after = ndn::time::milliseconds(m_jitter(m_rng));
m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] {
NDN_LOG_DEBUG("Got updates, renewing sync Interest now");
sendSyncInterest();
});
NDN_LOG_DEBUG("Schedule sync Interest after: " << after);
m_inNoNewDataWaitOutPeriod = false;
processWaitingInterests();
}
else {
NDN_LOG_TRACE("No new update, Interest nonce: " << interest.getNonce() <<
" , hash: " << std::hash<ndn::Name>{}(interest.getName()));
m_inNoNewDataWaitOutPeriod = true;
// Have to wait, otherwise will get same data from CS
auto after = m_syncReplyFreshness + ndn::time::milliseconds(m_jitter(m_rng));
m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] {
NDN_LOG_DEBUG("Sending sync Interest after no new update");
m_inNoNewDataWaitOutPeriod = false;
sendSyncInterest();
});
NDN_LOG_DEBUG("Schedule sync after: " << after);
}
}
void
FullProducer::satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq)
{
NDN_LOG_DEBUG("Satisfying full sync Interest: " << m_pendingEntries.size());
for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
NDN_LOG_TRACE("Satisfying pending Interest: " << std::hash<ndn::Name>{}(it->first.getPrefix(-1)));
const auto& entry = it->second;
auto diff = m_iblt - entry.iblt;
NDN_LOG_TRACE("Decoded: " << diff.canDecode << " positive: " << diff.positive.size() <<
" negative: " << diff.negative.size());
detail::State state;
bool publishedPrefixInDiff = false;
for (const auto& hash : diff.positive) {
auto nameIt = m_biMap.left.find(hash);
if (nameIt != m_biMap.left.end()) {
if (updatedPrefixWithSeq == nameIt->second) {
publishedPrefixInDiff = true;
}
state.addContent(nameIt->second);
}
}
if (!publishedPrefixInDiff) {
state.addContent(updatedPrefixWithSeq);
}
NDN_LOG_DEBUG("Satisfying sync content: " << state);
sendSyncData(it->first, state.wireEncode(), m_syncReplyFreshness);
it = m_pendingEntries.erase(it);
}
}
bool
FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
{
auto nextHash = detail::murmurHash3(detail::N_HASHCHECK,
ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1));
return negative.find(nextHash) != negative.end();
}
void
FullProducer::deletePendingInterests(const ndn::Name& interestName)
{
auto it = m_pendingEntries.find(interestName);
if (it != m_pendingEntries.end()) {
NDN_LOG_TRACE("Delete pending Interest: " << std::hash<ndn::Name>{}(interestName));
it = m_pendingEntries.erase(it);
}
}
} // namespace psync