partial sync: segment hello and sync data
add segment publisher
refs: #4662
Change-Id: I62e7a2247bac58aeec364cd2a4e4d34259eae4af
diff --git a/src/consumer.cpp b/src/consumer.cpp
index f27f355..04f8b4a 100644
--- a/src/consumer.cpp
+++ b/src/consumer.cpp
@@ -21,6 +21,7 @@
#include "detail/state.hpp"
#include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
#include <boost/algorithm/string.hpp>
@@ -41,6 +42,7 @@
, m_syncPrefix(syncPrefix)
, m_helloInterestPrefix(ndn::Name(m_syncPrefix).append("hello"))
, m_syncInterestPrefix(ndn::Name(m_syncPrefix).append("sync"))
+ , m_syncDataContentType(ndn::tlv::ContentType_Blob)
, m_onReceiveHelloData(onReceiveHelloData)
, m_onUpdate(onUpdate)
, m_bloomFilter(count, false_positive)
@@ -64,34 +66,70 @@
}
void
-Consumer::sendHelloInterest()
+Consumer::stop()
{
- ndn::Interest helloInterest(m_helloInterestPrefix);
- helloInterest.setInterestLifetime(m_helloInterestLifetime);
- helloInterest.setCanBePrefix(true);
- helloInterest.setMustBeFresh(true);
+ if (m_syncFetcher) {
+ m_syncFetcher->stop();
+ m_syncFetcher.reset();
+ }
- NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
-
- m_face.expressInterest(helloInterest,
- std::bind(&Consumer::onHelloData, this, _1, _2),
- std::bind(&Consumer::onNackForHello, this, _1, _2),
- std::bind(&Consumer::onHelloTimeout, this, _1));
+ if (m_helloFetcher) {
+ m_helloFetcher->stop();
+ m_helloFetcher.reset();
+ }
}
void
-Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::sendHelloInterest()
{
- ndn::Name helloDataName = data.getName();
+ ndn::Interest helloInterest(m_helloInterestPrefix);
+ NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
+
+ if (m_helloFetcher) {
+ m_helloFetcher->stop();
+ }
+
+ ndn::util::SegmentFetcher::Options options;
+ options.interestLifetime = m_helloInterestLifetime;
+ options.maxTimeout = m_helloInterestLifetime;
+
+ m_helloFetcher = ndn::util::SegmentFetcher::start(m_face,
+ helloInterest,
+ ndn::security::v2::getAcceptAllValidator(),
+ options);
+
+ m_helloFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+ if (data.getFinalBlock()) {
+ m_helloDataName = data.getName().getPrefix(-1);
+ }
+ });
+
+ m_helloFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+ onHelloData(bufferPtr);
+ });
+
+ m_helloFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_TRACE("Cannot fetch hello data, error: " <<
+ errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
+ });
+}
+
+void
+Consumer::onHelloData(const ndn::ConstBufferPtr& bufferPtr)
+{
NDN_LOG_DEBUG("On Hello Data");
// Extract IBF from name which is the last element in hello data's name
- m_iblt = helloDataName.getSubName(helloDataName.size()-1, 1);
+ m_iblt = m_helloDataName.getSubName(m_helloDataName.size()-1, 1);
NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
- State state(data.getContent());
+ State state(ndn::Block(std::move(bufferPtr)));
+
std::vector<MissingDataInfo> updates;
std::vector<ndn::Name> availableSubscriptions;
@@ -132,40 +170,64 @@
syncInterestName.append(m_iblt);
ndn::Interest syncInterest(syncInterestName);
- syncInterest.setInterestLifetime(m_syncInterestLifetime);
- syncInterest.setCanBePrefix(true);
- syncInterest.setMustBeFresh(true);
NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
" hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
- // Remove last pending interest before sending a new one
- if (m_outstandingInterestId != nullptr) {
- m_face.removePendingInterest(m_outstandingInterestId);
- m_outstandingInterestId = nullptr;
+ ndn::util::SegmentFetcher::Options options;
+ options.interestLifetime = m_syncInterestLifetime;
+ options.maxTimeout = m_syncInterestLifetime;;
+
+ if (m_syncFetcher) {
+ m_syncFetcher->stop();
}
- m_outstandingInterestId = m_face.expressInterest(syncInterest,
- std::bind(&Consumer::onSyncData, this, _1, _2),
- std::bind(&Consumer::onNackForSync, this, _1, _2),
- std::bind(&Consumer::onSyncTimeout, this, _1));
+ m_syncFetcher = ndn::util::SegmentFetcher::start(m_face,
+ syncInterest,
+ ndn::security::v2::getAcceptAllValidator(),
+ options);
+
+ m_syncFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+ if (data.getFinalBlock()) {
+ m_syncDataName = data.getName().getPrefix(-1);
+ m_syncDataContentType = data.getContentType();
+ }
+
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack)
+ {
+ NDN_LOG_DEBUG("Received application"
+ << " Nack from producer,"
+ << " sending hello again");
+ sendHelloInterest();
+ }
+ });
+
+ m_syncFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+ m_syncDataContentType = ndn::tlv::ContentType_Blob;
+ return;
+ }
+ NDN_LOG_TRACE("Segment fetcher got sync data");
+ onSyncData(bufferPtr);
+ });
+
+ m_syncFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_TRACE("Cannot fetch sync data, error: "
+ << errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+ });
}
void
-Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::onSyncData(const ndn::ConstBufferPtr& bufferPtr)
{
- ndn::Name syncDataName = data.getName();
-
// Extract IBF from sync data name which is the last component
- m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
+ m_iblt = m_syncDataName.getSubName(m_syncDataName.size()-1, 1);
- if (data.getContentType() == ndn::tlv::ContentType_Nack) {
- NDN_LOG_DEBUG("Received application Nack from producer, send hello again");
- sendHelloInterest();
- return;
- }
+ State state(ndn::Block(std::move(bufferPtr)));
- State state(data.getContent());
std::vector <MissingDataInfo> updates;
for (const auto& content : state.getContent()) {
@@ -190,40 +252,4 @@
sendSyncInterest();
}
-void
-Consumer::onHelloTimeout(const ndn::Interest& interest)
-{
- NDN_LOG_DEBUG("on hello timeout");
- this->sendHelloInterest();
-}
-
-void
-Consumer::onSyncTimeout(const ndn::Interest& interest)
-{
- NDN_LOG_DEBUG("on sync timeout " << interest.getNonce());
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-void
-Consumer::onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
- NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
- " for interest " << interest << std::endl);
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
-}
-
-void
-Consumer::onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
- NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
- " for interest " << interest << std::endl);
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-} // namespace psync
+} // namespace psync
\ No newline at end of file