**breaking** Consumer: change hello data callback to include sequence number
refs: #5122
Change-Id: Ief5cf9e9c542739613b54790e66209c66f4b6172
diff --git a/PSync/consumer.cpp b/PSync/consumer.cpp
index 0a7ad6f..f94c6cf 100644
--- a/PSync/consumer.cpp
+++ b/PSync/consumer.cpp
@@ -54,14 +54,20 @@
}
bool
-Consumer::addSubscription(const ndn::Name& prefix)
+Consumer::addSubscription(const ndn::Name& prefix, uint64_t seqNo, bool callSyncDataCb)
{
- auto it = m_prefixes.insert(std::pair<ndn::Name, uint64_t>(prefix, 0));
+ auto it = m_prefixes.emplace(prefix, seqNo);
if (!it.second) {
return false;
}
- m_subscriptionList.insert(prefix);
+
+ m_subscriptionList.emplace(prefix);
m_bloomFilter.insert(prefix.toUri());
+
+ if (callSyncDataCb && seqNo != 0) {
+ m_onUpdate({MissingDataInfo(prefix, seqNo, seqNo)});
+ }
+
return true;
}
@@ -124,28 +130,28 @@
NDN_LOG_DEBUG("On Hello Data");
// Extract IBF from name which is the last element in hello data's name
- m_iblt = m_helloDataName.getSubName(m_helloDataName.size()-1, 1);
+ m_iblt = m_helloDataName.getSubName(m_helloDataName.size() - 1, 1);
NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
State state{ndn::Block{bufferPtr}};
std::vector<MissingDataInfo> updates;
- std::vector<ndn::Name> availableSubscriptions;
+ std::map<ndn::Name, uint64_t> availableSubscriptions;
- NDN_LOG_DEBUG("Hello Data: " << state);
+ NDN_LOG_DEBUG("Hello Data: " << state);
- for (const auto& content : state.getContent()) {
+ for (const auto& content : state) {
const ndn::Name& prefix = content.getPrefix(-1);
- uint64_t seq = content.get(content.size()-1).toNumber();
+ uint64_t seq = content.get(content.size() - 1).toNumber();
// If consumer is subscribed then prefix must already be present in
// m_prefixes (see addSubscription). So [] operator is safe to use.
if (isSubscribed(prefix) && seq > m_prefixes[prefix]) {
// In case we are behind on this prefix and consumer is subscribed to it
- updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
+ updates.emplace_back(prefix, m_prefixes[prefix] + 1, seq);
m_prefixes[prefix] = seq;
}
- availableSubscriptions.push_back(prefix);
+ availableSubscriptions.emplace(prefix, seq);
}
m_onReceiveHelloData(availableSubscriptions);
@@ -226,14 +232,14 @@
std::vector<MissingDataInfo> updates;
- for (const auto& content : state.getContent()) {
+ for (const auto& content : state) {
NDN_LOG_DEBUG(content);
const ndn::Name& prefix = content.getPrefix(-1);
uint64_t seq = content.get(content.size() - 1).toNumber();
if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
// If this is just the next seq number then we had already informed the consumer about
// the previous sequence number and hence seq low and seq high should be equal to current seq
- updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
+ updates.emplace_back(prefix, m_prefixes[prefix] + 1, seq);
m_prefixes[prefix] = seq;
}
// Else updates will be empty and consumer will not be notified.
diff --git a/PSync/consumer.hpp b/PSync/consumer.hpp
index 9dfec9f..5f80d8e 100644
--- a/PSync/consumer.hpp
+++ b/PSync/consumer.hpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef PSYNC_CONSUMER_HPP
#define PSYNC_CONSUMER_HPP
@@ -37,7 +37,7 @@
using namespace ndn::literals::time_literals;
-typedef std::function<void(const std::vector<ndn::Name>&)> ReceiveHelloCallback;
+typedef std::function<void(const std::map<ndn::Name, uint64_t>&)> ReceiveHelloCallback;
typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
const ndn::time::milliseconds HELLO_INTEREST_LIFETIME = 1_s;
@@ -104,10 +104,16 @@
* @brief Add prefix to subscription list
*
* @param prefix prefix to be added to the list
+ * @param seqNo the latest sequence number for the prefix received in HelloData callback
+ * @param callSyncDataCb true by default to let app know that a new sequence number is available.
+ * Usually sequence number is zero in hello data, but when it is not Consumer can
+ * notify the app. Since the app is aware of the latest sequence number by
+ * ReceiveHelloCallback, app may choose to not let Consumer call UpdateCallback
+ * by setting this to false.
* @return true if prefix is added, false if it is already present
*/
bool
- addSubscription(const ndn::Name& prefix);
+ addSubscription(const ndn::Name& prefix, uint64_t seqNo, bool callSyncDataCb = true);
std::set<ndn::Name>
getSubscriptionList() const
diff --git a/PSync/detail/state.cpp b/PSync/detail/state.cpp
index 0a5242c..66173bd 100644
--- a/PSync/detail/state.cpp
+++ b/PSync/detail/state.cpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include "PSync/detail/state.hpp"
@@ -58,8 +58,7 @@
{
size_t totalLength = 0;
- for (std::vector<ndn::Name>::const_reverse_iterator it = m_content.rbegin();
- it != m_content.rend(); ++it) {
+ for (auto it = m_content.rbegin(); it != m_content.rend(); ++it) {
totalLength += it->wireEncode(block);
}
diff --git a/PSync/detail/state.hpp b/PSync/detail/state.hpp
index 14b1eb0..8a40331 100644
--- a/PSync/detail/state.hpp
+++ b/PSync/detail/state.hpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef PSYNC_STATE_HPP
#define PSYNC_STATE_HPP
@@ -58,6 +58,18 @@
void
wireDecode(const ndn::Block& wire);
+ std::vector<ndn::Name>::const_iterator
+ begin() const
+ {
+ return m_content.cbegin();
+ }
+
+ std::vector<ndn::Name>::const_iterator
+ end() const
+ {
+ return m_content.cend();
+ }
+
private:
std::vector<ndn::Name> m_content;
mutable ndn::Block m_wire;
diff --git a/PSync/detail/util.hpp b/PSync/detail/util.hpp
index b160cd3..2c10567 100644
--- a/PSync/detail/util.hpp
+++ b/PSync/detail/util.hpp
@@ -19,7 +19,7 @@
* murmurHash3 was written by Austin Appleby, and is placed in the public
* domain. The author hereby disclaims copyright to this source code.
* https://github.com/aappleby/smhasher/blob/master/src/murmurHash3.cpp
- **/
+ */
#ifndef PSYNC_UTIL_HPP
#define PSYNC_UTIL_HPP
@@ -45,6 +45,12 @@
struct MissingDataInfo
{
+ MissingDataInfo(const ndn::Name& prefix, uint64_t lowSeq, uint64_t highSeq)
+ : prefix(prefix)
+ , lowSeq(lowSeq)
+ , highSeq(highSeq)
+ {}
+
ndn::Name prefix;
uint64_t lowSeq;
uint64_t highSeq;
diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp
index d535695..3057f1b 100644
--- a/PSync/full-producer.cpp
+++ b/PSync/full-producer.cpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include "PSync/full-producer.hpp"
@@ -279,7 +279,7 @@
NDN_LOG_DEBUG("Sync Data Received: " << state);
- for (const auto& content : state.getContent()) {
+ for (const auto& content : state) {
ndn::Name prefix = content.getPrefix(-1);
uint64_t seq = content.get(content.size() - 1).toNumber();
diff --git a/PSync/segment-publisher.hpp b/PSync/segment-publisher.hpp
index f076a02..7ef84c5 100644
--- a/PSync/segment-publisher.hpp
+++ b/PSync/segment-publisher.hpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef PSYNC_SEGMENT_PUBLISHER_HPP
#define PSYNC_SEGMENT_PUBLISHER_HPP
diff --git a/examples/consumer.cpp b/examples/consumer.cpp
index 7551060..b63bc83 100644
--- a/examples/consumer.cpp
+++ b/examples/consumer.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include <PSync/consumer.hpp>
@@ -59,16 +59,22 @@
private:
void
- afterReceiveHelloData(const std::vector<ndn::Name>& availSubs)
+ afterReceiveHelloData(const std::map<ndn::Name, uint64_t>& availSubs)
{
- // Randomly subscribe to m_nSub prefixes
- std::vector<ndn::Name> sensors = availSubs;
+ std::vector<ndn::Name> sensors;
+ sensors.reserve(availSubs.size());
+ for (const auto& it : availSubs) {
+ sensors.insert(sensors.end(), it.first);
+ }
std::shuffle(sensors.begin(), sensors.end(), m_rng);
+ // Randomly subscribe to m_nSub prefixes
for (int i = 0; i < m_nSub; i++) {
- NDN_LOG_INFO("Subscribing to: " << sensors[i]);
- m_consumer.addSubscription(sensors[i]);
+ ndn::Name prefix = sensors[i];
+ NDN_LOG_INFO("Subscribing to: " << prefix);
+ auto it = availSubs.find(prefix);
+ m_consumer.addSubscription(prefix, it->second);
}
// After setting the subscription list, send the sync interest
diff --git a/tests/test-consumer.cpp b/tests/test-consumer.cpp
index e5560cd..4811ebe 100644
--- a/tests/test-consumer.cpp
+++ b/tests/test-consumer.cpp
@@ -15,7 +15,7 @@
*
* You should have received a copy of the GNU Lesser General Public License along with
* PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#include "PSync/consumer.hpp"
@@ -35,8 +35,8 @@
{
util::DummyClientFace face({true, true});
BOOST_REQUIRE_NO_THROW(Consumer(Name("/psync"), face,
- [] (const std::vector<Name>&) {},
- [] (const std::vector<MissingDataInfo>&) {},
+ [] (const auto&) {},
+ [] (const auto&) {},
40, 0.001));
}
@@ -51,8 +51,8 @@
Name subscription("test");
BOOST_CHECK(!consumer.isSubscribed(subscription));
- BOOST_CHECK(consumer.addSubscription(subscription));
- BOOST_CHECK(!consumer.addSubscription(subscription));
+ BOOST_CHECK(consumer.addSubscription(subscription, 0));
+ BOOST_CHECK(!consumer.addSubscription(subscription, 0));
}
BOOST_FIXTURE_TEST_CASE(ConstantTimeoutForFirstSegment, tests::UnitTestTimeFixture)
diff --git a/tests/test-partial-sync.cpp b/tests/test-partial-sync.cpp
index 31058ff..2d5835a 100644
--- a/tests/test-partial-sync.cpp
+++ b/tests/test-partial-sync.cpp
@@ -63,7 +63,7 @@
}
consumers[id] = std::make_shared<Consumer>(syncPrefix, *consumerFaces[id],
- [&, id] (const std::vector<Name>& availableSubs)
+ [&, id] (const auto& availableSubs)
{
numHelloDataRcvd++;
BOOST_CHECK(checkSubList(availableSubs));
@@ -71,11 +71,12 @@
checkIBFUpdated(id);
for (const auto& sub : subscribeTo) {
- consumers[id]->addSubscription(sub);
+ auto it = availableSubs.find(sub);
+ consumers[id]->addSubscription(sub, it->second);
}
consumers[id]->sendSyncInterest();
},
- [&, id] (const std::vector<MissingDataInfo>& updates) {
+ [&, id] (const auto& updates) {
numSyncDataRcvd++;
checkIBFUpdated(id);
@@ -100,10 +101,11 @@
}
bool
- checkSubList(const std::vector<Name>& availableSubs) const
+ checkSubList(const std::map<Name, uint64_t>& availableSubs) const
{
for (const auto& prefix : producer->m_prefixes) {
- if (std::find(availableSubs.begin(), availableSubs.end(), prefix.first) == availableSubs.end()) {
+ auto it = availableSubs.find(prefix.first);
+ if (it == availableSubs.end()) {
return false;
}
}
@@ -199,7 +201,7 @@
publishUpdateFor("testUser-2");
BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
- consumers[0]->addSubscription("testUser-3");
+ consumers[0]->addSubscription("testUser-3", 0);
consumers[0]->sendSyncInterest();
publishUpdateFor("testUser-3");
BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
@@ -431,6 +433,23 @@
BOOST_CHECK_EQUAL(face.sentData.front().getName().at(-1).toSegment(), 1);
}
+BOOST_AUTO_TEST_CASE(DelayedSubscription) // #5122
+{
+ publishUpdateFor("testUser-2");
+ std::vector<std::string> subscribeTo{"testUser-2", "testUser-4"};
+ addConsumer(0, subscribeTo);
+
+ consumers[0]->sendHelloInterest();
+ advanceClocks(ndn::time::milliseconds(10));
+ BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+ // Application came up late and subscribed to testUser-2
+ // after Producer had already published the first update.
+ // So by default Consumer will let the application know that
+ // the prefix it subscribed to has already some updates
+ BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+}
+
BOOST_AUTO_TEST_SUITE_END()
} // namespace psync