Replace FullSync algorithm and fix bugs to lower delay and overhead
1) During high update frequency PSync nodes can send data that contain
nothing new. This blocks the receiver nodes from syncing leading to
high delays. To fix this, Sync interests now include the cumulative number
of elements inserted into the IBF. This is used as an heuristic to guide
PSync on decode failures to minimize the chance of sending of such data
packets. Sync data freshness for entire dataset is also reduced.
If a no new update still happens, introduce some safeguards in the code.
2) Fast reaction upon negative set being detected is introduced. Timed
processing for interests is also introduced.
3) Fix satisfyPendingInterests to send the new update on publish even if
pending IBF decode fails, since we have something to send.
4) For interests which request segments, only respond with segments already contained
in in-memory storage.
refs: [solution II] in Thesis:
"Improvements to PSync: Distributed Full Dataset Synchronization
in Named-Data Networking"
https://digitalcommons.memphis.edu/cgi/viewcontent.cgi?article=3162&context=etd
Change-Id: Ie235b4fb56fcb7de21068511205e407006292b23
diff --git a/AUTHORS.md b/AUTHORS.md
index 8391947..6e0b086 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -7,6 +7,8 @@
* Alexander Afanasyev <https://users.cs.fiu.edu/~afanasyev>
* ***(Maintainer)*** Saurab Dulal <https://dulalsaurab.github.io>
* ***(Former Maintainer)*** Ashlesh Gawande <https://www.linkedin.com/in/agawande>
+* Dylan Hensley
+* Alexander Lane <https://github.com/awlane>
* Eric Newberry <https://ericnewberry.com>
* Davide Pesavento <https://github.com/Pesa>
* Junxiao Shi <https://cs.arizona.edu/~shijunxiao>
diff --git a/PSync/common.hpp b/PSync/common.hpp
index 2bea73d..83fa1b8 100644
--- a/PSync/common.hpp
+++ b/PSync/common.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2022, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
diff --git a/PSync/consumer.hpp b/PSync/consumer.hpp
index 7d7995d..1cbd204 100644
--- a/PSync/consumer.hpp
+++ b/PSync/consumer.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2023, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -65,7 +65,7 @@
/// Callback to give sync data back to application.
UpdateCallback onUpdate = [] (const auto&) {};
/// Number of expected elements (subscriptions) in Bloom filter.
- uint32_t bfCount = 80;
+ uint32_t bfCount = 6;
/// Bloom filter false positive probability.
double bfFalsePositive = 0.001;
/// Lifetime of hello Interest.
diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp
index 4a3401d..5073ab5 100644
--- a/PSync/full-producer.cpp
+++ b/PSync/full-producer.cpp
@@ -84,12 +84,20 @@
uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
updateSeqNo(prefix, newSeq);
- satisfyPendingInterests();
+
+ m_inNoNewDataWaitOutPeriod = false;
+
+ satisfyPendingInterests(ndn::Name(prefix).appendNumber(newSeq));
}
void
FullProducer::sendSyncInterest()
{
+ if (m_inNoNewDataWaitOutPeriod) {
+ NDN_LOG_TRACE("Cannot send sync Interest as Data is expected from CS");
+ return;
+ }
+
// If we send two sync interest one after the other
// since there is no new data in the network yet,
// when data is available it may satisfy both of them
@@ -102,6 +110,15 @@
// Append our latest IBF
m_iblt.appendToName(syncInterestName);
+ // Append cumulative updates that has been inserted into this IBF
+ syncInterestName.appendNumber(m_numOwnElements);
+
+ auto currentTime = ndn::time::system_clock::now();
+ if ((currentTime - m_lastInterestSentTime < ndn::time::milliseconds(MIN_JITTER)) &&
+ (m_outstandingInterestName == syncInterestName)) {
+ NDN_LOG_TRACE("Suppressing Interest: " << std::hash<ndn::Name>{}(syncInterestName));
+ return;
+ }
m_outstandingInterestName = syncInterestName;
@@ -117,6 +134,14 @@
options.maxTimeout = m_syncInterestLifetime;
options.rttOptions.initialRto = m_syncInterestLifetime;
+ // This log message must be before sending the Interest through SegmentFetcher
+ // because getNonce generates a Nonce for this Interest.
+ // SegmentFetcher makes a copy of this Interest, so if we print the Nonce
+ // after, that Nonce will be different than the one seen in tshark!
+ NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
+ ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
+
+ m_lastInterestSentTime = currentTime;
m_fetcher = SegmentFetcher::start(m_face, syncInterest,
ndn::security::getAcceptAllValidator(), options);
@@ -142,42 +167,98 @@
// timeout before it happens.
if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
auto after = ndn::time::milliseconds(m_jitter(m_rng));
- NDN_LOG_DEBUG("Schedule sync interest after: " << after);
+ NDN_LOG_DEBUG("Schedule sync Interest after: " << after);
m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); });
}
});
-
- NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
- ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
}
void
-FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
+FullProducer::processWaitingInterests()
{
- // TODO: answer only segments from store.
- if (m_segmentPublisher.replyFromStore(interest.getName())) {
+ NDN_LOG_TRACE("Processing waiting Interest list, size: " << m_waitingForProcessing.size());
+ if (m_waitingForProcessing.size() == 0) {
return;
}
- ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
- ndn::Name interestName;
+ for (auto it = m_waitingForProcessing.begin(); it != m_waitingForProcessing.end();) {
+ if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
+ NDN_LOG_TRACE("Interest with hash already marked for deletion, removing now: " <<
+ std::hash<ndn::Name>{}(it->first));
+ it = m_waitingForProcessing.erase(it);
+ continue;
+ }
- if (nameWithoutSyncPrefix.size() == 1) {
- // Get /<prefix>/IBF from /<prefix>/IBF
- interestName = interest.getName();
+ it->second.numTries += 1;
+ ndn::Interest interest(it->first);
+ interest.setNonce(it->second.nonce);
+ onSyncInterest(m_syncPrefix, interest, true);
+ if (it->second.numTries == std::numeric_limits<uint16_t>::max()) {
+ NDN_LOG_TRACE("Removing Interest with hash: " << std::hash<ndn::Name>{}(it->first));
+ it = m_waitingForProcessing.erase(it);
+ }
+ else {
+ ++it;
+ }
}
- else if (nameWithoutSyncPrefix.size() == 3) {
- // Get /<prefix>/IBF from /<prefix>/IBF/<version>/<segment-no>
- interestName = interest.getName().getPrefix(-2);
- }
- else {
+ NDN_LOG_TRACE("Done processing waiting Interest list, size: " << m_waitingForProcessing.size());
+}
+
+void
+FullProducer::scheduleProcessWaitingInterests()
+{
+ // If nothing waiting, no need to schedule
+ if (m_waitingForProcessing.size() == 0) {
return;
}
- ndn::name::Component ibltName = interestName.get(interestName.size() - 1);
+ if (!m_interestDelayTimerId) {
+ auto after = ndn::time::milliseconds(m_jitter(m_rng));
+ NDN_LOG_TRACE("Setting a timer to processes waiting Interest(s) in: " << after);
+ m_interestDelayTimerId = m_scheduler.schedule(after, [=] {
+ NDN_LOG_TRACE("Timer has expired, trying to process waiting Interest(s)");
+ processWaitingInterests();
+ scheduleProcessWaitingInterests();
+ });
+ }
+}
+
+void
+FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest,
+ bool isTimedProcessing)
+{
+ ndn::Name interestName = interest.getName();
+ auto interestNameHash = std::hash<ndn::Name>{}(interestName);
NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() <<
- ", hash: " << std::hash<ndn::Name>{}(interestName));
+ ", hash: " << interestNameHash);
+
+ if (isTimedProcessing) {
+ NDN_LOG_TRACE("Delayed Interest being processed now");
+ }
+
+ if (m_segmentPublisher.replyFromStore(interestName)) {
+ NDN_LOG_DEBUG("Answer from memory");
+ return;
+ }
+
+ ndn::Name nameWithoutSyncPrefix = interestName.getSubName(prefixName.size());
+
+ if (nameWithoutSyncPrefix.size() == 4) {
+ // /<IBF>/<numCumulativeElements>/<version>/<segment>
+ NDN_LOG_DEBUG("Segment not found in memory. Other side will have to restart");
+ // This should have been answered from publisher Cache!
+ sendApplicationNack(prefixName);
+ return;
+ }
+
+ if (nameWithoutSyncPrefix.size() != 2) {
+ NDN_LOG_WARN("Two components required after sync prefix: /<IBF>/<numCumulativeElements>; received: " << interestName);
+ return;
+ }
+
+ ndn::name::Component ibltName = interestName[-2];
+ uint64_t numRcvdElements = interestName[-1].toNumber();
detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
try {
@@ -189,80 +270,157 @@
}
auto diff = m_iblt - iblt;
- if (!diff.canDecode) {
- NDN_LOG_TRACE("Cannot decode differences, positive: " << diff.positive.size()
- << " negative: " << diff.negative.size() << " m_threshold: "
- << m_threshold);
- // Send all data if greater then threshold, else send positive below as usual
- // Or send if we can't get neither positive nor negative differences
- if (diff.positive.size() + diff.negative.size() >= m_threshold ||
- (diff.positive.empty() && diff.negative.empty())) {
+ NDN_LOG_TRACE("Decode, positive: " << diff.positive.size()
+ << " negative: " << diff.negative.size() << " m_threshold: "
+ << m_threshold);
+
+ auto waitingIt = m_waitingForProcessing.find(interestName);
+
+ if (!diff.canDecode) {
+ NDN_LOG_DEBUG("Cannot decode differences!");
+
+ if (numRcvdElements > m_numOwnElements) {
+ if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
+ NDN_LOG_TRACE("Decode failure, adding to waiting Interest list " << interestNameHash);
+ m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
+ scheduleProcessWaitingInterests();
+ }
+ else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
+ if (waitingIt->second.numTries > 1) {
+ NDN_LOG_TRACE("Decode failure, still behind. Erasing waiting Interest as we have tried twice");
+ waitingIt->second.numTries = std::numeric_limits<uint16_t>::max(); // markWaitingInterestForDeletion
+ NDN_LOG_DEBUG("Waiting Interest has been deleted. Sending new sync interest");
+ sendSyncInterest();
+ }
+ else {
+ NDN_LOG_TRACE("Decode failure, still behind, waiting more till the next timer");
+ }
+ }
+ else {
+ NDN_LOG_TRACE("Decode failure, still behind");
+ }
+ }
+ else {
+ if (m_numOwnElements == numRcvdElements && diff.positive.size() == 0 && diff.negative.size() > 0) {
+ NDN_LOG_TRACE("We have nothing to offer and are actually behind");
+#ifdef PSYNC_WITH_TESTS
+ ++nIbfDecodeFailuresBelowThreshold;
+#endif // PSYNC_WITH_TESTS
+ return;
+ }
+
detail::State state;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
state.addContent(ndn::Name(content.first).appendNumber(content.second));
}
}
+#ifdef PSYNC_WITH_TESTS
+ ++nIbfDecodeFailuresAboveThreshold;
+#endif // PSYNC_WITH_TESTS
if (!state.getContent().empty()) {
- sendSyncData(interest.getName(), state.wireEncode());
+ NDN_LOG_DEBUG("Sending entire state: " << state);
+ // Want low freshness when potentially sending large content to clear it quickly from the network
+ sendSyncData(interestName, state.wireEncode(), 10_ms);
+ // Since we're directly sending the data, we need to clear pending interests here
+ deletePendingInterests(interestName);
}
-
-#ifdef PSYNC_WITH_TESTS
- ++nIbfDecodeFailuresAboveThreshold;
-#endif // PSYNC_WITH_TESTS
- return;
- }
-
-#ifdef PSYNC_WITH_TESTS
- ++nIbfDecodeFailuresBelowThreshold;
-#endif // PSYNC_WITH_TESTS
- }
-
- detail::State state;
- for (const auto& hash : diff.positive) {
- auto nameIt = m_biMap.left.find(hash);
- if (nameIt != m_biMap.left.end()) {
- ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
- // Don't sync up sequence number zero
- if (m_prefixes[nameWithoutSeq] != 0 &&
- !isFutureHash(nameWithoutSeq, diff.negative)) {
- state.addContent(nameIt->second);
+ // We seem to be ahead, delete the Interest from waiting list
+ if (waitingIt != m_waitingForProcessing.end()) {
+ waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
}
}
- }
-
- if (!state.getContent().empty()) {
- NDN_LOG_DEBUG("Sending sync content: " << state);
- sendSyncData(interestName, state.wireEncode());
return;
}
- auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
- entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
- [this, interest] {
- NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce());
- m_pendingEntries.erase(interest.getName());
- });
+ if (diff.positive.size() == 0 && diff.negative.size() == 0) {
+ NDN_LOG_TRACE("Saving positive: " << diff.positive.size() << " negative: " << diff.negative.size());
+
+ auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
+ entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
+ [this, interest] {
+ NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce());
+ m_pendingEntries.erase(interest.getName());
+ });
+
+ // Can't delete directly in this case as it will cause
+ // memory access errors with the for loop in processWaitingInterests
+ if (isTimedProcessing) {
+ if (waitingIt != m_waitingForProcessing.end()) {
+ waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
+ }
+ }
+ return;
+ }
+
+ // Only add to waiting list if we don't have anything to send (positive = 0)
+ if (diff.positive.size() == 0 && diff.negative.size() > 0) {
+ if (!isTimedProcessing && waitingIt == m_waitingForProcessing.end()) {
+ NDN_LOG_TRACE("Adding Interest to waiting list: " << interestNameHash);
+ m_waitingForProcessing.emplace(interestName, WaitingEntryInfo{0, interest.getNonce()});
+ scheduleProcessWaitingInterests();
+ }
+ else if (isTimedProcessing && waitingIt != m_waitingForProcessing.end()) {
+ if (waitingIt->second.numTries > 1) {
+ NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash <<
+ ". Erasing waiting Interest as we have tried twice");
+ waitingIt->second.numTries = std::numeric_limits<uint16_t>::max(); // markWaitingInterestForDeletion
+ }
+ else {
+ NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash <<
+ ". Keep waiting for Interest as number of tries is not exhausted");
+ }
+ }
+ else {
+ NDN_LOG_TRACE("Still behind after waiting for Interest " << interestNameHash);
+ }
+ return;
+ }
+
+ if (diff.positive.size() > 0) {
+ detail::State state;
+ for (const auto& hash : diff.positive) {
+ auto nameIt = m_biMap.left.find(hash);
+ if (nameIt != m_biMap.left.end()) {
+ ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
+ // Don't sync up sequence number zero
+ if (m_prefixes[nameWithoutSeq] != 0 &&
+ !isFutureHash(nameWithoutSeq.toUri(), diff.negative)) {
+ state.addContent(nameIt->second);
+ }
+ }
+ }
+
+ if (!state.getContent().empty()) {
+ NDN_LOG_DEBUG("Sending sync content: " << state);
+ sendSyncData(interestName, state.wireEncode(), m_syncReplyFreshness);
+
+ // Timed processing or not - if we are answering it, it should not go in waiting Interests
+ if (waitingIt != m_waitingForProcessing.end()) {
+ waitingIt->second.numTries = std::numeric_limits<uint16_t>::max();
+ }
+ }
+ }
}
void
-FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
+FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block,
+ ndn::time::milliseconds syncReplyFreshness)
{
bool isSatisfyingOwnInterest = m_outstandingInterestName == name;
if (isSatisfyingOwnInterest && m_fetcher) {
- NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
+ NDN_LOG_DEBUG("Removing our pending Interest from face (stop fetcher)");
m_fetcher->stop();
m_outstandingInterestName.clear();
}
NDN_LOG_DEBUG("Sending sync Data");
auto content = detail::compress(m_contentCompression, block);
- m_segmentPublisher.publish(name, name, *content, m_syncReplyFreshness);
-
+ m_segmentPublisher.publish(name, name, *content, syncReplyFreshness);
if (isSatisfyingOwnInterest) {
- NDN_LOG_TRACE("Renewing sync interest");
+ NDN_LOG_DEBUG("Renewing sync interest");
sendSyncInterest();
}
}
@@ -299,54 +457,67 @@
}
}
- // We just got the data, so send a new sync interest
if (!updates.empty()) {
m_onUpdate(updates);
- NDN_LOG_TRACE("Renewing sync interest");
- sendSyncInterest();
+ // Wait a bit to let neighbors get the data too
+ auto after = ndn::time::milliseconds(m_jitter(m_rng));
+ m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] {
+ NDN_LOG_DEBUG("Got updates, renewing sync Interest now");
+ sendSyncInterest();
+ });
+ NDN_LOG_DEBUG("Schedule sync Interest after: " << after);
+ m_inNoNewDataWaitOutPeriod = false;
+
+ processWaitingInterests();
}
else {
- NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
+ NDN_LOG_TRACE("No new update, Interest nonce: " << interest.getNonce() <<
" , hash: " << std::hash<ndn::Name>{}(interest.getName()));
+ m_inNoNewDataWaitOutPeriod = true;
+
+ // Have to wait, otherwise will get same data from CS
+ auto after = m_syncReplyFreshness + ndn::time::milliseconds(m_jitter(m_rng));
+ m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] {
+ NDN_LOG_DEBUG("Sending sync Interest after no new update");
+ m_inNoNewDataWaitOutPeriod = false;
+ sendSyncInterest();
+ });
+ NDN_LOG_DEBUG("Schedule sync after: " << after);
}
+
}
void
-FullProducer::satisfyPendingInterests()
+FullProducer::satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq)
{
- NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
+ NDN_LOG_DEBUG("Satisfying full sync Interest: " << m_pendingEntries.size());
for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+ NDN_LOG_TRACE("Satisfying pending Interest: " << std::hash<ndn::Name>{}(it->first.getPrefix(-1)));
const auto& entry = it->second;
auto diff = m_iblt - entry.iblt;
- if (!diff.canDecode) {
- NDN_LOG_TRACE("Decode failed for pending interest");
- if (diff.positive.size() + diff.negative.size() >= m_threshold ||
- (diff.positive.empty() && diff.negative.empty())) {
- NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
- it = m_pendingEntries.erase(it);
- continue;
- }
- }
+ NDN_LOG_TRACE("Decoded: " << diff.canDecode << " positive: " << diff.positive.size() <<
+ " negative: " << diff.negative.size());
detail::State state;
+ bool publishedPrefixInDiff = false;
for (const auto& hash : diff.positive) {
auto nameIt = m_biMap.left.find(hash);
if (nameIt != m_biMap.left.end()) {
- if (m_prefixes[nameIt->second.getPrefix(-1)] != 0) {
- state.addContent(nameIt->second);
+ if (updatedPrefixWithSeq == nameIt->second) {
+ publishedPrefixInDiff = true;
}
+ state.addContent(nameIt->second);
}
}
- if (!state.getContent().empty()) {
- NDN_LOG_DEBUG("Satisfying sync content: " << state);
- sendSyncData(it->first, state.wireEncode());
- it = m_pendingEntries.erase(it);
+ if (!publishedPrefixInDiff) {
+ state.addContent(updatedPrefixWithSeq);
}
- else {
- ++it;
- }
+
+ NDN_LOG_DEBUG("Satisfying sync content: " << state);
+ sendSyncData(it->first, state.wireEncode(), m_syncReplyFreshness);
+ it = m_pendingEntries.erase(it);
}
}
@@ -363,7 +534,7 @@
{
auto it = m_pendingEntries.find(interestName);
if (it != m_pendingEntries.end()) {
- NDN_LOG_TRACE("Delete pending interest: " << interestName);
+ NDN_LOG_TRACE("Delete pending Interest: " << std::hash<ndn::Name>{}(interestName));
it = m_pendingEntries.erase(it);
}
}
diff --git a/PSync/full-producer.hpp b/PSync/full-producer.hpp
index 4644727..2e331c0 100644
--- a/PSync/full-producer.hpp
+++ b/PSync/full-producer.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2023, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef PSYNC_FULL_PRODUCER_HPP
#define PSYNC_FULL_PRODUCER_HPP
@@ -112,6 +112,12 @@
void
sendSyncInterest();
+ void
+ processWaitingInterests();
+
+ void
+ scheduleProcessWaitingInterests();
+
/**
* @brief Process sync interest from other parties
*
@@ -125,9 +131,11 @@
*
* @param prefixName prefix for sync group which we registered
* @param interest the interest we got
+ * @param isTimedProcessing is this interest from the waiting interests list
*/
void
- onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
+ onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest,
+ bool isTimedProcessing = false);
/**
* @brief Send sync data
@@ -138,9 +146,11 @@
*
* @param name name to be set as data name
* @param block the content of the data
+ * @param syncReplyFreshness the freshness to use for the sync data; defaults to @p SYNC_REPLY_FRESHNESS
*/
void
- sendSyncData(const ndn::Name& name, const ndn::Block& block);
+ sendSyncData(const ndn::Name& name, const ndn::Block& block,
+ ndn::time::milliseconds syncReplyFreshness);
/**
* @brief Process sync data
@@ -164,12 +174,14 @@
/**
* @brief Satisfy pending sync interests
*
- * For pending sync interests SI, if IBF of SI has any difference from our own IBF:
- * send data back.
- * If we can't decode differences from the stored IBF, then delete it.
+ * For pending sync interests do a difference with current IBF to find out missing prefixes.
+ * Send [Missing Prefixes] union @p updatedPrefixWithSeq
+ *
+ * This is because it is called from publish, so the @p updatedPrefixWithSeq must be missing
+ * from other nodes regardless of IBF difference failure.
*/
void
- satisfyPendingInterests();
+ satisfyPendingInterests(const ndn::Name& updatedPrefixWithSeq);
/**
* @brief Delete pending sync interests that match given name
@@ -177,7 +189,7 @@
void
deletePendingInterests(const ndn::Name& interestName);
- /**
+ /**
* @brief Check if hash(prefix + 1) is in negative
*
* Sometimes what happens is that interest from other side
@@ -199,15 +211,29 @@
ndn::scheduler::ScopedEventId expirationEvent;
};
- std::map<ndn::Name, PendingEntryInfo> m_pendingEntries;
+ struct WaitingEntryInfo
+ {
+ uint16_t numTries = 0;
+ ndn::Interest::Nonce nonce;
+ };
+
ndn::time::milliseconds m_syncInterestLifetime;
UpdateCallback m_onUpdate;
ndn::scheduler::ScopedEventId m_scheduledSyncInterestId;
- std::uniform_int_distribution<> m_jitter{100, 500};
+ static constexpr int MIN_JITTER = 100;
+ static constexpr int MAX_JITTER = 500;
+ std::uniform_int_distribution<> m_jitter{MIN_JITTER, MAX_JITTER};
+ ndn::time::system_clock::time_point m_lastInterestSentTime;
ndn::Name m_outstandingInterestName;
ndn::ScopedRegisteredPrefixHandle m_registeredPrefix;
std::shared_ptr<ndn::SegmentFetcher> m_fetcher;
uint64_t m_incomingFace = 0;
+ std::map<ndn::Name, WaitingEntryInfo> m_waitingForProcessing;
+ bool m_inNoNewDataWaitOutPeriod = false;
+ ndn::scheduler::ScopedEventId m_interestDelayTimerId;
+
+PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ std::map<ndn::Name, PendingEntryInfo> m_pendingEntries;
};
} // namespace psync
diff --git a/PSync/producer-base.cpp b/PSync/producer-base.cpp
index faa3ea6..a43afb5 100644
--- a/PSync/producer-base.cpp
+++ b/PSync/producer-base.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2023, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include "PSync/producer-base.hpp"
#include "PSync/detail/util.hpp"
@@ -115,6 +115,8 @@
auto newHash = detail::murmurHash3(detail::N_HASHCHECK, prefixWithSeq);
m_biMap.insert({newHash, prefixWithSeq});
m_iblt.insert(newHash);
+
+ m_numOwnElements += (seq - oldSeq);
}
void
diff --git a/PSync/producer-base.hpp b/PSync/producer-base.hpp
index faaa926..e857d5a 100644
--- a/PSync/producer-base.hpp
+++ b/PSync/producer-base.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2023, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef PSYNC_PRODUCER_BASE_HPP
#define PSYNC_PRODUCER_BASE_HPP
@@ -176,6 +176,7 @@
const ndn::time::milliseconds m_syncReplyFreshness;
const CompressionScheme m_ibltCompression;
const CompressionScheme m_contentCompression;
+ uint64_t m_numOwnElements = 0;
};
} // namespace psync
diff --git a/examples/full-sync.cpp b/examples/full-sync.cpp
index 6248336..e202a83 100644
--- a/examples/full-sync.cpp
+++ b/examples/full-sync.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2023, The University of Memphis
+ * Copyright (c) 2014-2024, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include <PSync/full-producer.hpp>
@@ -37,7 +37,7 @@
/**
* @brief Initialize producer and schedule updates.
*
- * Set IBF size as 80 expecting 80 updates to IBF in a sync cycle.
+ * Use default IBF size of 6 as we're expecting 6 updates to IBF in a sync cycle.
* Set syncInterestLifetime and syncDataFreshness to 1.6 seconds.
* userPrefix is the prefix string of user node prefixes.
*/
@@ -46,7 +46,6 @@
: m_producer(m_face, m_keyChain, syncPrefix, [this] {
psync::FullProducer::Options opts;
opts.onUpdate = std::bind(&Producer::processSyncUpdate, this, _1);
- opts.ibfCount = 80;
opts.syncInterestLifetime = 1600_ms;
opts.syncDataFreshness = 1600_ms;
return opts;
diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp
index c833181..bcb753a 100644
--- a/tests/test-full-producer.cpp
+++ b/tests/test-full-producer.cpp
@@ -15,11 +15,12 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include "PSync/full-producer.hpp"
#include "PSync/detail/util.hpp"
+
#include "tests/boost-test.hpp"
#include "tests/io-fixture.hpp"
#include "tests/key-chain-fixture.hpp"
@@ -87,6 +88,54 @@
BOOST_CHECK_NO_THROW(node.onSyncData(syncInterest, goodCompressBadBlock));
}
+BOOST_AUTO_TEST_CASE(SatisfyPendingInterestsBehavior)
+{
+ Name syncPrefix("/psync");
+ FullProducer::Options opts;
+ opts.ibfCount = 6;
+ FullProducer node(m_face, m_keyChain, syncPrefix, opts);
+
+ Name syncInterestName(syncPrefix);
+ node.m_iblt.appendToName(syncInterestName);
+ syncInterestName.appendNumber(1);
+ Interest syncInterest(syncInterestName);
+
+ node.addUserNode(syncPrefix);
+
+ node.onSyncInterest(syncPrefix, syncInterest);
+
+ BOOST_CHECK_EQUAL(node.m_pendingEntries.size(), 1);
+
+ // Test whether data is still sent if IBF diff is greater than default threshhold.
+ auto prefix1 = Name("/test/alice").appendNumber(1);
+ uint32_t newHash1 = psync::detail::murmurHash3(42, prefix1);
+ node.m_iblt.insert(newHash1);
+
+ auto prefix2 = Name("/test/bob").appendNumber(1);
+ uint32_t newHash2 = psync::detail::murmurHash3(42, prefix2);
+ node.m_iblt.insert(newHash2);
+
+ auto prefix3 = Name("/test/carol").appendNumber(1);
+ uint32_t newHash3 = psync::detail::murmurHash3(42, prefix3);
+ node.m_iblt.insert(newHash3);
+
+ auto prefix4 = Name("/test/david").appendNumber(1);
+ uint32_t newHash4 = psync::detail::murmurHash3(42, prefix4);
+ node.m_iblt.insert(newHash4);
+
+ auto prefix5 = Name("/test/erin").appendNumber(1);
+ uint32_t newHash5 = psync::detail::murmurHash3(42, prefix5);
+ node.m_iblt.insert(newHash5);
+
+ node.publishName(syncPrefix);
+
+ advanceClocks(10_ms);
+
+ BOOST_CHECK_EQUAL(m_face.sentData.size(), 1);
+
+ BOOST_CHECK_EQUAL(node.m_pendingEntries.empty(), true);
+}
+
BOOST_AUTO_TEST_SUITE_END()
} // namespace psync::tests
diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp
index c483d89..ff25728 100644
--- a/tests/test-full-sync.cpp
+++ b/tests/test-full-sync.cpp
@@ -297,7 +297,7 @@
nodes[i]->publishName(userPrefixes[i]);
}
- advanceClocks(10_ms, 100);
+ advanceClocks(100_ms, 100);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 1);
@@ -308,7 +308,7 @@
nodes[i]->publishName(userPrefixes[i], 4);
}
- advanceClocks(10_ms, 100);
+ advanceClocks(100_ms, 100);
for (int i = 0; i < 4; i++) {
for (int j = 0; j < 4; j++) {
BOOST_CHECK_EQUAL(nodes[i]->getSeqNo(userPrefixes[j]).value_or(NOT_EXIST), 4);
@@ -464,55 +464,6 @@
});
}
-BOOST_AUTO_TEST_CASE(DelayedSecondSegment)
-{
- addNode(0);
-
- int i = 0;
- detail::State state;
- std::shared_ptr<ndn::Buffer> compressed;
- do {
- auto prefixToPublish = makeSubPrefix(0, i++);
- nodes[0]->addUserNode(prefixToPublish);
- nodes[0]->publishName(prefixToPublish);
-
- state.addContent(Name(prefixToPublish).appendNumber(nodes[0]->m_prefixes[prefixToPublish]));
-
- auto block = state.wireEncode();
- compressed = detail::compress(nodes[0]->m_contentCompression, block);
- } while (compressed->size() < (ndn::MAX_NDN_PACKET_SIZE >> 1));
-
- advanceClocks(10_ms, 100);
-
- Name syncInterestName(syncPrefix);
- detail::IBLT iblt(40, nodes[0]->m_ibltCompression);
- iblt.appendToName(syncInterestName);
-
- nodes[0]->onSyncInterest(syncPrefix, Interest(syncInterestName));
-
- advanceClocks(10_ms);
-
- BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2);
- // Expire contents from segmentPublisher
- advanceClocks(10_ms, 100);
- BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 0);
-
- // Get data name from face and increase segment number to form next interest
- BOOST_REQUIRE(!faces[0]->sentData.empty());
- Name dataName = faces[0]->sentData.front().getName();
- Name interestName = dataName.getPrefix(-1).appendSegment(1);
- faces[0]->sentData.clear();
-
- nodes[0]->onSyncInterest(syncPrefix, Interest(interestName));
- advanceClocks(10_ms);
-
- // Should have repopulated SegmentPublisher
- BOOST_CHECK_EQUAL(nodes[0]->m_segmentPublisher.m_ims.size(), 2);
- // Should have received the second data segment this time
- BOOST_REQUIRE(!faces[0]->sentData.empty());
- BOOST_CHECK_EQUAL(faces[0]->sentData.front().getName().at(-1).toSegment(), 1);
-}
-
BOOST_AUTO_TEST_SUITE_END()
} // namespace psync::tests