Follow ndn::Scheduler API changes
And some code cleanups
Refs: #4883
Change-Id: Ifca73d336579211cf137fd6afe905fb562291800
diff --git a/PSync/consumer.cpp b/PSync/consumer.cpp
index d27866b..24483a7 100644
--- a/PSync/consumer.cpp
+++ b/PSync/consumer.cpp
@@ -83,39 +83,36 @@
Consumer::sendHelloInterest()
{
ndn::Interest helloInterest(m_helloInterestPrefix);
-
NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
if (m_helloFetcher) {
m_helloFetcher->stop();
}
- ndn::util::SegmentFetcher::Options options;
+ using ndn::util::SegmentFetcher;
+ SegmentFetcher::Options options;
options.interestLifetime = m_helloInterestLifetime;
options.maxTimeout = m_helloInterestLifetime;
- m_helloFetcher = ndn::util::SegmentFetcher::start(m_face,
- helloInterest,
- ndn::security::v2::getAcceptAllValidator(),
- options);
+ m_helloFetcher = SegmentFetcher::start(m_face, helloInterest,
+ ndn::security::v2::getAcceptAllValidator(), options);
m_helloFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
- if (data.getFinalBlock()) {
- m_helloDataName = data.getName().getPrefix(-2);
- }
- });
+ if (data.getFinalBlock()) {
+ m_helloDataName = data.getName().getPrefix(-2);
+ }
+ });
- m_helloFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
- onHelloData(bufferPtr);
- });
+ m_helloFetcher->onComplete.connect([this] (const ndn::ConstBufferPtr& bufferPtr) {
+ onHelloData(bufferPtr);
+ });
m_helloFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
- NDN_LOG_TRACE("Cannot fetch hello data, error: " <<
- errorCode << " message: " << msg);
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- NDN_LOG_TRACE("Scheduling after " << after);
- m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
- });
+ NDN_LOG_TRACE("Cannot fetch hello data, error: " << errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.schedule(after, [this] { sendHelloInterest(); });
+ });
}
void
@@ -128,7 +125,7 @@
NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
- State state(ndn::Block(std::move(bufferPtr)));
+ State state{ndn::Block{bufferPtr}};
std::vector<MissingDataInfo> updates;
std::vector<ndn::Name> availableSubscriptions;
@@ -136,7 +133,7 @@
NDN_LOG_DEBUG("Hello Data: " << state);
for (const auto& content : state.getContent()) {
- ndn::Name prefix = content.getPrefix(-1);
+ const ndn::Name& prefix = content.getPrefix(-1);
uint64_t seq = content.get(content.size()-1).toNumber();
// If consumer is subscribed then prefix must already be present in
// m_prefixes (see addSubscription). So [] operator is safe to use.
@@ -174,66 +171,61 @@
NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
" hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
- ndn::util::SegmentFetcher::Options options;
- options.interestLifetime = m_syncInterestLifetime;
- options.maxTimeout = m_syncInterestLifetime;;
-
if (m_syncFetcher) {
m_syncFetcher->stop();
}
- m_syncFetcher = ndn::util::SegmentFetcher::start(m_face,
- syncInterest,
- ndn::security::v2::getAcceptAllValidator(),
- options);
+ using ndn::util::SegmentFetcher;
+ SegmentFetcher::Options options;
+ options.interestLifetime = m_syncInterestLifetime;
+ options.maxTimeout = m_syncInterestLifetime;;
+
+ m_syncFetcher = SegmentFetcher::start(m_face, syncInterest,
+ ndn::security::v2::getAcceptAllValidator(), options);
m_syncFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
- if (data.getFinalBlock()) {
- m_syncDataName = data.getName().getPrefix(-2);
- m_syncDataContentType = data.getContentType();
- }
+ if (data.getFinalBlock()) {
+ m_syncDataName = data.getName().getPrefix(-2);
+ m_syncDataContentType = data.getContentType();
+ }
- if (m_syncDataContentType == ndn::tlv::ContentType_Nack)
- {
- NDN_LOG_DEBUG("Received application"
- << " Nack from producer,"
- << " sending hello again");
- sendHelloInterest();
- }
- });
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+ NDN_LOG_DEBUG("Received application Nack from producer, sending hello again");
+ sendHelloInterest();
+ }
+ });
- m_syncFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
- if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
- m_syncDataContentType = ndn::tlv::ContentType_Blob;
- return;
- }
- NDN_LOG_TRACE("Segment fetcher got sync data");
- onSyncData(bufferPtr);
- });
+ m_syncFetcher->onComplete.connect([this] (const ndn::ConstBufferPtr& bufferPtr) {
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+ m_syncDataContentType = ndn::tlv::ContentType_Blob;
+ return;
+ }
+ NDN_LOG_TRACE("Segment fetcher got sync data");
+ onSyncData(bufferPtr);
+ });
m_syncFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
- NDN_LOG_TRACE("Cannot fetch sync data, error: "
- << errorCode << " message: " << msg);
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- NDN_LOG_TRACE("Scheduling after " << after);
- m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
- });
+ NDN_LOG_TRACE("Cannot fetch sync data, error: " << errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.schedule(after, [this] { sendSyncInterest(); });
+ });
}
void
Consumer::onSyncData(const ndn::ConstBufferPtr& bufferPtr)
{
// Extract IBF from sync data name which is the last component
- m_iblt = m_syncDataName.getSubName(m_syncDataName.size()-1, 1);
+ m_iblt = m_syncDataName.getSubName(m_syncDataName.size() - 1, 1);
- State state(ndn::Block(std::move(bufferPtr)));
+ State state{ndn::Block{bufferPtr}};
- std::vector <MissingDataInfo> updates;
+ std::vector<MissingDataInfo> updates;
for (const auto& content : state.getContent()) {
NDN_LOG_DEBUG(content);
- ndn::Name prefix = content.getPrefix(-1);
- uint64_t seq = content.get(content.size()-1).toNumber();
+ const ndn::Name& prefix = content.getPrefix(-1);
+ uint64_t seq = content.get(content.size() - 1).toNumber();
if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
// If this is just the next seq number then we had already informed the consumer about
// the previous sequence number and hence seq low and seq high should be equal to current seq
@@ -243,7 +235,7 @@
// Else updates will be empty and consumer will not be notified.
}
- NDN_LOG_DEBUG("Sync Data: " << state);
+ NDN_LOG_DEBUG("Sync Data: " << state);
if (!updates.empty()) {
m_onUpdate(updates);
@@ -252,4 +244,4 @@
sendSyncInterest();
}
-} // namespace psync
\ No newline at end of file
+} // namespace psync
diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp
index 4390a26..a7a0b24 100644
--- a/PSync/full-producer.cpp
+++ b/PSync/full-producer.cpp
@@ -98,28 +98,26 @@
m_outstandingInterestName = syncInterestName;
m_scheduledSyncInterestId =
- m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
- [this] { sendSyncInterest(); });
+ m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
+ [this] { sendSyncInterest(); });
ndn::Interest syncInterest(syncInterestName);
- ndn::util::SegmentFetcher::Options options;
+ using ndn::util::SegmentFetcher;
+ SegmentFetcher::Options options;
options.interestLifetime = m_syncInterestLifetime;
options.maxTimeout = m_syncInterestLifetime;
- m_fetcher = ndn::util::SegmentFetcher::start(m_face,
- syncInterest,
- ndn::security::v2::getAcceptAllValidator(),
- options);
+ m_fetcher = SegmentFetcher::start(m_face, syncInterest,
+ ndn::security::v2::getAcceptAllValidator(), options);
- m_fetcher->onComplete.connect([this, syncInterest] (ndn::ConstBufferPtr bufferPtr) {
- onSyncData(syncInterest, bufferPtr);
- });
+ m_fetcher->onComplete.connect([this, syncInterest] (const ndn::ConstBufferPtr& bufferPtr) {
+ onSyncData(syncInterest, bufferPtr);
+ });
m_fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
- NDN_LOG_ERROR("Cannot fetch sync data, error: " <<
- errorCode << " message: " << msg);
- });
+ NDN_LOG_ERROR("Cannot fetch sync data, error: " << errorCode << " message: " << msg);
+ });
NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
", hash: " << std::hash<ndn::Name>{}(syncInterestName));
@@ -168,8 +166,8 @@
if (!diff.listEntries(positive, negative)) {
NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
- << " negative: " << negative.size() << " m_threshold: "
- << m_threshold);
+ << " negative: " << negative.size() << " m_threshold: "
+ << m_threshold);
// Send all data if greater then threshold, else send positive below as usual
// Or send if we can't get neither positive nor negative differences
@@ -193,7 +191,7 @@
State state;
for (const auto& hash : positive) {
- ndn::Name prefix = m_hash2prefix[hash];
+ const ndn::Name& prefix = m_hash2prefix[hash];
// Don't sync up sequence number zero
if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
@@ -207,7 +205,7 @@
}
auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfoFull{iblt, {}}).first->second;
- entry.expirationEvent = m_scheduler.scheduleEvent(interest.getInterestLifetime(),
+ entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
[this, interest] {
NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
m_pendingEntries.erase(interest.getName());
@@ -254,14 +252,14 @@
{
deletePendingInterests(interest.getName());
- State state(ndn::Block(std::move(bufferPtr)));
+ State state{ndn::Block{bufferPtr}};
std::vector<MissingDataInfo> updates;
- NDN_LOG_DEBUG("Sync Data Received: " << state);
+ NDN_LOG_DEBUG("Sync Data Received: " << state);
for (const auto& content : state.getContent()) {
- ndn::Name prefix = content.getPrefix(-1);
- uint64_t seq = content.get(content.size()-1).toNumber();
+ const ndn::Name& prefix = content.getPrefix(-1);
+ uint64_t seq = content.get(content.size() - 1).toNumber();
if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
@@ -301,7 +299,7 @@
if (positive.size() + negative.size() >= m_threshold ||
(positive.size() == 0 && negative.size() == 0)) {
NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
- m_pendingEntries.erase(it++);
+ it = m_pendingEntries.erase(it);
continue;
}
}
@@ -318,7 +316,7 @@
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Satisfying sync content: " << state);
sendSyncData(it->first, state.wireEncode());
- m_pendingEntries.erase(it++);
+ it = m_pendingEntries.erase(it);
}
else {
++it;
@@ -341,11 +339,12 @@
}
void
-FullProducer::deletePendingInterests(const ndn::Name& interestName) {
+FullProducer::deletePendingInterests(const ndn::Name& interestName)
+{
for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
if (it->first == interestName) {
NDN_LOG_TRACE("Delete pending interest: " << interestName);
- m_pendingEntries.erase(it++);
+ it = m_pendingEntries.erase(it);
}
else {
++it;
diff --git a/PSync/full-producer.hpp b/PSync/full-producer.hpp
index b5eb55f..ace7813 100644
--- a/PSync/full-producer.hpp
+++ b/PSync/full-producer.hpp
@@ -24,8 +24,8 @@
#include "PSync/detail/state.hpp"
#include <map>
-#include <unordered_set>
#include <random>
+#include <set>
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/key-chain.hpp>
@@ -41,7 +41,7 @@
struct PendingEntryInfoFull
{
IBLT iblt;
- ndn::util::scheduler::ScopedEventId expirationEvent;
+ ndn::scheduler::ScopedEventId expirationEvent;
};
typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
@@ -187,7 +187,7 @@
std::map<ndn::Name, PendingEntryInfoFull> m_pendingEntries;
ndn::time::milliseconds m_syncInterestLifetime;
UpdateCallback m_onUpdate;
- ndn::util::scheduler::ScopedEventId m_scheduledSyncInterestId;
+ ndn::scheduler::ScopedEventId m_scheduledSyncInterestId;
std::uniform_int_distribution<> m_jitter;
ndn::Name m_outstandingInterestName;
ndn::ScopedRegisteredPrefixHandle m_registeredPrefix;
diff --git a/PSync/partial-producer.cpp b/PSync/partial-producer.cpp
index 87f648d..e75bc38 100644
--- a/PSync/partial-producer.cpp
+++ b/PSync/partial-producer.cpp
@@ -193,7 +193,7 @@
}
auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{bf, iblt, {}}).first->second;
- entry.expirationEvent = m_scheduler.scheduleEvent(interest.getInterestLifetime(),
+ entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
[this, interest] {
NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
m_pendingEntries.erase(interest.getName());
diff --git a/PSync/partial-producer.hpp b/PSync/partial-producer.hpp
index 94f5672..2559af8 100644
--- a/PSync/partial-producer.hpp
+++ b/PSync/partial-producer.hpp
@@ -24,7 +24,6 @@
#include "PSync/producer-base.hpp"
#include <map>
-#include <unordered_set>
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/security/key-chain.hpp>
@@ -37,7 +36,7 @@
{
BloomFilter bf;
IBLT iblt;
- ndn::util::scheduler::ScopedEventId expirationEvent;
+ ndn::scheduler::ScopedEventId expirationEvent;
};
/**
diff --git a/PSync/segment-publisher.cpp b/PSync/segment-publisher.cpp
index f3c5489..6229949 100644
--- a/PSync/segment-publisher.cpp
+++ b/PSync/segment-publisher.cpp
@@ -23,8 +23,7 @@
namespace psync {
-SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
- size_t imsLimit)
+SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain, size_t imsLimit)
: m_face(face)
, m_scheduler(m_face.getIoService())
, m_keyChain(keyChain)
@@ -43,7 +42,7 @@
}
ndn::EncodingBuffer buffer;
- buffer.prependBlock(std::move(block));
+ buffer.prependBlock(block);
const uint8_t* rawBuffer = buffer.buf();
const uint8_t* segmentBegin = rawBuffer;
@@ -67,7 +66,7 @@
segmentName.appendSegment(segmentNo);
// We get a std::exception: bad_weak_ptr from m_ims if we don't use shared_ptr for data
- std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+ auto data = std::make_shared<ndn::Data>(segmentName);
data->setContent(segmentBegin, segmentEnd - segmentBegin);
data->setFreshnessPeriod(freshness);
data->setFinalBlock(ndn::name::Component::fromSegment(totalSegments));
@@ -83,10 +82,7 @@
}
m_ims.insert(*data, freshness);
- m_scheduler.scheduleEvent(freshness,
- [this, segmentName] {
- m_ims.erase(segmentName);
- });
+ m_scheduler.schedule(freshness, [this, segmentName] { m_ims.erase(segmentName); });
++segmentNo;
} while (segmentBegin < end);
diff --git a/PSync/segment-publisher.hpp b/PSync/segment-publisher.hpp
index f00acff..35354cf 100644
--- a/PSync/segment-publisher.hpp
+++ b/PSync/segment-publisher.hpp
@@ -68,7 +68,7 @@
private:
ndn::Face& m_face;
- ndn::util::Scheduler m_scheduler;
+ ndn::Scheduler m_scheduler;
ndn::KeyChain& m_keyChain;
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
diff --git a/examples/full-sync.cpp b/examples/full-sync.cpp
index 09f7b5b..f92149d 100644
--- a/examples/full-sync.cpp
+++ b/examples/full-sync.cpp
@@ -19,8 +19,10 @@
#include <PSync/full-producer.hpp>
+#include <ndn-cxx/face.hpp>
#include <ndn-cxx/util/logger.hpp>
#include <ndn-cxx/util/random.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
#include <iostream>
@@ -52,13 +54,9 @@
// Add user prefixes and schedule updates for them in specified interval
for (int i = 0; i < m_numDataStreams; i++) {
ndn::Name prefix(userPrefix + "-" + ndn::to_string(i));
-
m_fullProducer.addUserNode(prefix);
-
- m_scheduler.scheduleEvent(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
- [this, prefix] {
- doUpdate(prefix);
- });
+ m_scheduler.schedule(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
+ [this, prefix] { doUpdate(prefix); });
}
}
@@ -78,10 +76,8 @@
NDN_LOG_INFO("Publish: " << prefix << "/" << seqNo);
if (seqNo < m_maxNumPublish) {
- m_scheduler.scheduleEvent(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
- [this, prefix] {
- doUpdate(prefix);
- });
+ m_scheduler.schedule(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
+ [this, prefix] { doUpdate(prefix); });
}
}
@@ -97,7 +93,7 @@
private:
ndn::Face m_face;
- ndn::util::Scheduler m_scheduler;
+ ndn::Scheduler m_scheduler;
psync::FullProducer m_fullProducer;
diff --git a/examples/producer.cpp b/examples/producer.cpp
index ef0ff2d..6fcbb25 100644
--- a/examples/producer.cpp
+++ b/examples/producer.cpp
@@ -19,8 +19,10 @@
#include <PSync/partial-producer.hpp>
+#include <ndn-cxx/face.hpp>
#include <ndn-cxx/util/logger.hpp>
#include <ndn-cxx/util/random.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
#include <iostream>
@@ -52,10 +54,8 @@
m_producer.addUserNode(updateName);
// Each user prefix is updated at random interval between 0 and 60 second
- m_scheduler.scheduleEvent(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
- [this, updateName] {
- doUpdate(updateName);
- });
+ m_scheduler.schedule(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
+ [this, updateName] { doUpdate(updateName); });
}
}
@@ -77,16 +77,14 @@
if (seqNo < m_maxNumPublish) {
// Schedule the next update for this user prefix b/w 0 and 60 seconds
- m_scheduler.scheduleEvent(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
- [this, updateName] {
- doUpdate(updateName);
- });
+ m_scheduler.schedule(ndn::time::milliseconds(m_rangeUniformRandom(m_rng)),
+ [this, updateName] { doUpdate(updateName); });
}
}
private:
ndn::Face m_face;
- ndn::util::Scheduler m_scheduler;
+ ndn::Scheduler m_scheduler;
psync::PartialProducer m_producer;
@@ -114,4 +112,4 @@
catch (const std::exception& e) {
NDN_LOG_ERROR(e.what());
}
-}
\ No newline at end of file
+}