full-producer: segment sync data
refs: #4716
Change-Id: I6776bcfd2816c1c45548c3799c1ad3e52cb3cf18
diff --git a/src/detail/state.cpp b/src/detail/state.cpp
index d245c0b..09ee55e 100644
--- a/src/detail/state.cpp
+++ b/src/detail/state.cpp
@@ -72,9 +72,8 @@
State::wireDecode(const ndn::Block& wire)
{
auto blockType = wire.type();
-
- if (blockType != tlv::PSyncContent && blockType != ndn::tlv::Content) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Content/PSyncContent Block, but Block is of a different type: #" +
+ if (blockType != tlv::PSyncContent) {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected PSyncContent Block, but Block is of type: #" +
ndn::to_string(blockType)));
return;
}
@@ -83,34 +82,13 @@
m_wire = wire;
- auto it = m_wire.elements_begin();
-
- if (it == m_wire.elements_end()) {
- return;
- }
-
- if (blockType == tlv::PSyncContent) {
- while(it != m_wire.elements_end()) {
- if (it->type() == ndn::tlv::Name) {
- m_content.emplace_back(*it);
- }
- else {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
- ndn::to_string(it->type())));
- }
- ++it;
+ for (auto it = m_wire.elements_begin(); it != m_wire.elements_end(); ++it) {
+ if (it->type() == ndn::tlv::Name) {
+ m_content.emplace_back(*it);
}
- }
- else if (blockType == ndn::tlv::Content) {
- it->parse();
- for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
- if (val->type() == ndn::tlv::Name) {
- m_content.emplace_back(*val);
- }
- else {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
- ndn::to_string(val->type())));
- }
+ else {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of type: #" +
+ ndn::to_string(it->type())));
}
}
}
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
index c00bd0a..0953f93 100644
--- a/src/full-producer.cpp
+++ b/src/full-producer.cpp
@@ -41,7 +41,6 @@
: ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
, m_syncInterestLifetime(syncInterestLifetime)
, m_onUpdate(onUpdateCallBack)
- , m_outstandingInterestId(nullptr)
, m_scheduledSyncInterestId(m_scheduler)
{
int jitter = m_syncInterestLifetime.count() * .20;
@@ -49,7 +48,7 @@
m_registerPrefixId =
m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
- std::bind(&FullProducer::onInterest, this, _1, _2),
+ std::bind(&FullProducer::onSyncInterest, this, _1, _2),
std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
// Should we do this after setInterestFilter success call back
@@ -59,8 +58,8 @@
FullProducer::~FullProducer()
{
- if (m_outstandingInterestId != nullptr) {
- m_face.removePendingInterest(m_outstandingInterestId);
+ if (m_fetcher) {
+ m_fetcher->stop();
}
m_face.unsetInterestFilter(m_registerPrefixId);
@@ -89,8 +88,8 @@
// If we send two sync interest one after the other
// since there is no new data in the network yet,
// when data is available it may satisfy both of them
- if (m_outstandingInterestId != nullptr) {
- m_face.removePendingInterest(m_outstandingInterestId);
+ if (m_fetcher) {
+ m_fetcher->stop();
}
// Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
@@ -106,50 +105,53 @@
[this] { sendSyncInterest(); });
ndn::Interest syncInterest(syncInterestName);
- syncInterest.setInterestLifetime(m_syncInterestLifetime);
- // Other side appends hash of IBF to sync data name
- syncInterest.setCanBePrefix(true);
- syncInterest.setMustBeFresh(true);
- syncInterest.setNonce(1);
- syncInterest.refreshNonce();
+ ndn::util::SegmentFetcher::Options options;
+ options.interestLifetime = m_syncInterestLifetime;
+ options.maxTimeout = m_syncInterestLifetime;
- m_outstandingInterestId = m_face.expressInterest(syncInterest,
- std::bind(&FullProducer::onSyncData, this, _1, _2),
- [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
- NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
- " for Interest with Nonce: " << interest.getNonce());
- },
- [] (const ndn::Interest& interest) {
- NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
- });
+ m_fetcher = ndn::util::SegmentFetcher::start(m_face,
+ syncInterest,
+ ndn::security::v2::getAcceptAllValidator(),
+ options);
+
+ m_fetcher->onComplete.connect([this, syncInterest] (ndn::ConstBufferPtr bufferPtr) {
+ onSyncData(syncInterest, bufferPtr);
+ });
+
+ m_fetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_ERROR("Cannot fetch sync data, error: " <<
+ errorCode << " message: " << msg);
+ });
NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
", hash: " << std::hash<ndn::Name>{}(syncInterestName));
}
void
-FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
+FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
{
- ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
- if (nameWithoutSyncPrefix.size() == 2 &&
- nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
- onRecoveryInterest(interest);
+ if (m_segmentPublisher.replyFromStore(interest.getName())) {
+ return;
}
- // interest for recovery segment
- else if (nameWithoutSyncPrefix.size() == 3 &&
- nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 2) == RECOVERY_PREFIX.get(0)) {
- onRecoveryInterest(interest);
- }
- else if (nameWithoutSyncPrefix.size() == 1) {
- onSyncInterest(interest);
- }
-}
-void
-FullProducer::onSyncInterest(const ndn::Interest& interest)
-{
- ndn::Name interestName = interest.getName();
+ ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
+ ndn::Name interestName;
+ uint64_t interestSeq = 0;
+
+ if (nameWithoutSyncPrefix.size() == 1) {
+ // Get /IBF from /IBF
+ interestName = interest.getName();
+ }
+ else if (nameWithoutSyncPrefix.size() == 2) {
+ // Get /IBF from /IBF/<seq-no>
+ interestName = interest.getName().getPrefix(-1);
+ interestSeq = interest.getName().get(-1).toSegment();
+ }
+ else {
+ return;
+ }
+
ndn::name::Component ibltName = interestName.get(interestName.size()-1);
NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
@@ -174,23 +176,22 @@
<< " negative: " << negative.size() << " m_threshold: "
<< m_threshold);
- // Send nack if greater then threshold, else send positive below as usual
+ // Send all data if greater then threshold, else send positive below as usual
// Or send if we can't get neither positive nor negative differences
if (positive.size() + negative.size() >= m_threshold ||
(positive.size() == 0 && negative.size() == 0)) {
-
- // If we don't have anything to offer means that
- // we are behind and should not mislead other nodes.
- bool haveSomethingToOffer = false;
+ State state;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
- haveSomethingToOffer = true;
+ state.addContent(ndn::Name(content.first).appendNumber(content.second));
}
}
- if (haveSomethingToOffer) {
- sendApplicationNack(interestName);
+ if (!state.getContent().empty()) {
+ m_segmentPublisher.publish(interest.getName(), interest.getName(),
+ state.wireEncode(), m_syncReplyFreshness);
}
+
return;
}
}
@@ -206,6 +207,9 @@
if (!state.getContent().empty()) {
NDN_LOG_DEBUG("Sending sync content: " << state);
+ if (interestSeq != 0) {
+ interestName.appendSegment(interestSeq);
+ }
sendSyncData(interestName, state.wireEncode());
return;
}
@@ -223,26 +227,6 @@
}
void
-FullProducer::onRecoveryInterest(const ndn::Interest& interest)
-{
- NDN_LOG_DEBUG("Recovery interest received");
-
- if (m_segmentPublisher.replyFromStore(interest.getName())) {
- return;
- }
-
- State state;
- for (const auto& content : m_prefixes) {
- if (content.second != 0) {
- state.addContent(ndn::Name(content.first).appendNumber(content.second));
- }
- }
-
- m_segmentPublisher.publish(interest.getName(), interest.getName(),
- state.wireEncode(), m_syncReplyFreshness);
-}
-
-void
FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
{
NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
@@ -251,57 +235,40 @@
m_iblt.appendToName(nameWithIblt);
// Append hash of our IBF so that data name maybe different for each node answering
- ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
- data.setFreshnessPeriod(m_syncReplyFreshness);
- data.setContent(block);
- m_keyChain.sign(data);
+ ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
// checking if our own interest got satisfied
if (m_outstandingInterestName == name) {
NDN_LOG_DEBUG("Satisfied our own pending interest");
// remove outstanding interest
- if (m_outstandingInterestId != nullptr) {
- NDN_LOG_DEBUG("Removing our pending interest from face");
- m_face.removePendingInterest(m_outstandingInterestId);
- m_outstandingInterestId = nullptr;
+ if (m_fetcher) {
+ NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
+ m_fetcher->stop();
m_outstandingInterestName = ndn::Name("");
}
NDN_LOG_DEBUG("Sending Sync Data");
// Send data after removing pending sync interest on face
- m_face.put(data);
+ m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
NDN_LOG_TRACE("Renewing sync interest");
sendSyncInterest();
}
else {
NDN_LOG_DEBUG("Sending Sync Data");
- m_face.put(data);
+ m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
}
}
void
-FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
{
- ndn::Name interestName = interest.getName();
deletePendingInterests(interest.getName());
- if (data.getContentType() == ndn::tlv::ContentType_Nack) {
- NDN_LOG_DEBUG("Got application nack, sending recovery interest");
- sendRecoveryInterest(interest);
- return;
- }
-
- State state(data.getContent());
+ State state(ndn::Block(std::move(bufferPtr)));
std::vector<MissingDataInfo> updates;
- if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
- state.getContent().empty()) {
- NDN_LOG_TRACE("Recovery data is empty, other side is behind");
- return;
- }
-
NDN_LOG_DEBUG("Sync Data Received: " << state);
for (const auto& content : state.getContent()) {
@@ -325,8 +292,8 @@
sendSyncInterest();
}
else {
- NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
- " , hash: " << std::hash<ndn::Name>{}(interestName));
+ NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
+ " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
}
}
@@ -398,39 +365,4 @@
}
}
-void
-FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
-{
- if (m_outstandingInterestId != nullptr) {
- m_face.removePendingInterest(m_outstandingInterestId);
- m_outstandingInterestId = nullptr;
- }
-
- ndn::Name ibltName;
- m_iblt.appendToName(ibltName);
-
- ndn::Name recoveryInterestName(m_syncPrefix);
- recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
- recoveryInterestName.append(RECOVERY_PREFIX);
-
- ndn::Interest recoveryInterest(recoveryInterestName);
- recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
-
- auto fetcher = ndn::util::SegmentFetcher::start(m_face,
- recoveryInterest,
- ndn::security::v2::getAcceptAllValidator());
-
- fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
- NDN_LOG_TRACE("Segment fetcher got data");
- ndn::Data data;
- data.setContent(std::move(bufferPtr));
- onSyncData(recoveryInterest, data);
- });
-
- fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
- NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
- " message: " << msg);
- });
-}
-
} // namespace psync
\ No newline at end of file
diff --git a/src/full-producer.hpp b/src/full-producer.hpp
index bcf699e..21756d0 100644
--- a/src/full-producer.hpp
+++ b/src/full-producer.hpp
@@ -32,6 +32,7 @@
#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
#include <ndn-cxx/util/time.hpp>
#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
namespace psync {
@@ -47,7 +48,6 @@
typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
const ndn::time::milliseconds SYNC_INTEREST_LIFTIME = 1_s;
-const ndn::Name RECOVERY_PREFIX("recovery");
/**
* @brief Full sync logic to synchronize with other nodes
@@ -111,19 +111,6 @@
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
/**
- * @brief Process interest from other parties
- *
- * Determine whether this is a sync interest or recovery interest
- * and dispatch to onSyncInterest or onRecoveryInterest respectively.
- *
- * @param prefixName prefix for sync group which we registered
- * @param interest the interest we got
- */
- void
- onInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
-
-private:
- /**
* @brief Process sync interest from other parties
*
* Get differences b/w our IBF and IBF in the sync interest.
@@ -134,19 +121,13 @@
* Otherwise add the sync interest into a map with interest name as key and PendingEntryInfoFull
* as value.
*
- * @param interest the sync interest we got
+ * @param prefixName prefix for sync group which we registered
+ * @param interest the interest we got
*/
void
- onSyncInterest(const ndn::Interest& interest);
+ onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
- /**
- * @brief Publish our entire state so that requester can catch up.
- *
- * @param interest the recovery interest we got
- */
- void
- onRecoveryInterest(const ndn::Interest& interest);
-
+private:
/**
* @brief Send sync data
*
@@ -173,10 +154,10 @@
* sendSyncInterest because the last one was satisfied by the incoming data
*
* @param interest interest for which we got the data
- * @param data the data packet we got
+ * @param bufferPtr sync data content
*/
void
- onSyncData(const ndn::Interest& interest, const ndn::Data& data);
+ onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr);
/**
* @brief Satisfy pending sync interests
@@ -204,15 +185,6 @@
bool
isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
- /**
- * @brief Send recovery interest using segment fetcher
- *
- * Recovery data is expected go over max packet size
- * Appends the RECOVERY_PREFIX to the given interest
- */
- void
- sendRecoveryInterest(const ndn::Interest& interest);
-
private:
std::map <ndn::Name, PendingEntryInfoFull> m_pendingEntries;
@@ -220,8 +192,6 @@
UpdateCallback m_onUpdate;
- const ndn::PendingInterestId* m_outstandingInterestId;
-
ndn::util::scheduler::ScopedEventId m_scheduledSyncInterestId;
std::uniform_int_distribution<> m_jitter;
@@ -229,6 +199,8 @@
ndn::Name m_outstandingInterestName;
const ndn::RegisteredPrefixId* m_registerPrefixId;
+
+ std::shared_ptr<ndn::util::SegmentFetcher> m_fetcher;
};
} // namespace psync