partial sync: segment hello and sync data
add segment publisher
refs: #4662
Change-Id: I62e7a2247bac58aeec364cd2a4e4d34259eae4af
diff --git a/src/consumer.cpp b/src/consumer.cpp
index f27f355..04f8b4a 100644
--- a/src/consumer.cpp
+++ b/src/consumer.cpp
@@ -21,6 +21,7 @@
#include "detail/state.hpp"
#include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
#include <boost/algorithm/string.hpp>
@@ -41,6 +42,7 @@
, m_syncPrefix(syncPrefix)
, m_helloInterestPrefix(ndn::Name(m_syncPrefix).append("hello"))
, m_syncInterestPrefix(ndn::Name(m_syncPrefix).append("sync"))
+ , m_syncDataContentType(ndn::tlv::ContentType_Blob)
, m_onReceiveHelloData(onReceiveHelloData)
, m_onUpdate(onUpdate)
, m_bloomFilter(count, false_positive)
@@ -64,34 +66,70 @@
}
void
-Consumer::sendHelloInterest()
+Consumer::stop()
{
- ndn::Interest helloInterest(m_helloInterestPrefix);
- helloInterest.setInterestLifetime(m_helloInterestLifetime);
- helloInterest.setCanBePrefix(true);
- helloInterest.setMustBeFresh(true);
+ if (m_syncFetcher) {
+ m_syncFetcher->stop();
+ m_syncFetcher.reset();
+ }
- NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
-
- m_face.expressInterest(helloInterest,
- std::bind(&Consumer::onHelloData, this, _1, _2),
- std::bind(&Consumer::onNackForHello, this, _1, _2),
- std::bind(&Consumer::onHelloTimeout, this, _1));
+ if (m_helloFetcher) {
+ m_helloFetcher->stop();
+ m_helloFetcher.reset();
+ }
}
void
-Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::sendHelloInterest()
{
- ndn::Name helloDataName = data.getName();
+ ndn::Interest helloInterest(m_helloInterestPrefix);
+ NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
+
+ if (m_helloFetcher) {
+ m_helloFetcher->stop();
+ }
+
+ ndn::util::SegmentFetcher::Options options;
+ options.interestLifetime = m_helloInterestLifetime;
+ options.maxTimeout = m_helloInterestLifetime;
+
+ m_helloFetcher = ndn::util::SegmentFetcher::start(m_face,
+ helloInterest,
+ ndn::security::v2::getAcceptAllValidator(),
+ options);
+
+ m_helloFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+ if (data.getFinalBlock()) {
+ m_helloDataName = data.getName().getPrefix(-1);
+ }
+ });
+
+ m_helloFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+ onHelloData(bufferPtr);
+ });
+
+ m_helloFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_TRACE("Cannot fetch hello data, error: " <<
+ errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
+ });
+}
+
+void
+Consumer::onHelloData(const ndn::ConstBufferPtr& bufferPtr)
+{
NDN_LOG_DEBUG("On Hello Data");
// Extract IBF from name which is the last element in hello data's name
- m_iblt = helloDataName.getSubName(helloDataName.size()-1, 1);
+ m_iblt = m_helloDataName.getSubName(m_helloDataName.size()-1, 1);
NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
- State state(data.getContent());
+ State state(ndn::Block(std::move(bufferPtr)));
+
std::vector<MissingDataInfo> updates;
std::vector<ndn::Name> availableSubscriptions;
@@ -132,40 +170,64 @@
syncInterestName.append(m_iblt);
ndn::Interest syncInterest(syncInterestName);
- syncInterest.setInterestLifetime(m_syncInterestLifetime);
- syncInterest.setCanBePrefix(true);
- syncInterest.setMustBeFresh(true);
NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
" hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
- // Remove last pending interest before sending a new one
- if (m_outstandingInterestId != nullptr) {
- m_face.removePendingInterest(m_outstandingInterestId);
- m_outstandingInterestId = nullptr;
+ ndn::util::SegmentFetcher::Options options;
+ options.interestLifetime = m_syncInterestLifetime;
+ options.maxTimeout = m_syncInterestLifetime;;
+
+ if (m_syncFetcher) {
+ m_syncFetcher->stop();
}
- m_outstandingInterestId = m_face.expressInterest(syncInterest,
- std::bind(&Consumer::onSyncData, this, _1, _2),
- std::bind(&Consumer::onNackForSync, this, _1, _2),
- std::bind(&Consumer::onSyncTimeout, this, _1));
+ m_syncFetcher = ndn::util::SegmentFetcher::start(m_face,
+ syncInterest,
+ ndn::security::v2::getAcceptAllValidator(),
+ options);
+
+ m_syncFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+ if (data.getFinalBlock()) {
+ m_syncDataName = data.getName().getPrefix(-1);
+ m_syncDataContentType = data.getContentType();
+ }
+
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack)
+ {
+ NDN_LOG_DEBUG("Received application"
+ << " Nack from producer,"
+ << " sending hello again");
+ sendHelloInterest();
+ }
+ });
+
+ m_syncFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+ if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+ m_syncDataContentType = ndn::tlv::ContentType_Blob;
+ return;
+ }
+ NDN_LOG_TRACE("Segment fetcher got sync data");
+ onSyncData(bufferPtr);
+ });
+
+ m_syncFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_TRACE("Cannot fetch sync data, error: "
+ << errorCode << " message: " << msg);
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ NDN_LOG_TRACE("Scheduling after " << after);
+ m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+ });
}
void
-Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::onSyncData(const ndn::ConstBufferPtr& bufferPtr)
{
- ndn::Name syncDataName = data.getName();
-
// Extract IBF from sync data name which is the last component
- m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
+ m_iblt = m_syncDataName.getSubName(m_syncDataName.size()-1, 1);
- if (data.getContentType() == ndn::tlv::ContentType_Nack) {
- NDN_LOG_DEBUG("Received application Nack from producer, send hello again");
- sendHelloInterest();
- return;
- }
+ State state(ndn::Block(std::move(bufferPtr)));
- State state(data.getContent());
std::vector <MissingDataInfo> updates;
for (const auto& content : state.getContent()) {
@@ -190,40 +252,4 @@
sendSyncInterest();
}
-void
-Consumer::onHelloTimeout(const ndn::Interest& interest)
-{
- NDN_LOG_DEBUG("on hello timeout");
- this->sendHelloInterest();
-}
-
-void
-Consumer::onSyncTimeout(const ndn::Interest& interest)
-{
- NDN_LOG_DEBUG("on sync timeout " << interest.getNonce());
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-void
-Consumer::onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
- NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
- " for interest " << interest << std::endl);
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
-}
-
-void
-Consumer::onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
- NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
- " for interest " << interest << std::endl);
-
- ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
- m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-} // namespace psync
+} // namespace psync
\ No newline at end of file
diff --git a/src/consumer.hpp b/src/consumer.hpp
index 9b30d8d..6460bc5 100644
--- a/src/consumer.hpp
+++ b/src/consumer.hpp
@@ -26,6 +26,7 @@
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
#include <ndn-cxx/util/time.hpp>
#include <random>
@@ -130,6 +131,12 @@
return it->second;
}
+ /**
+ * @brief Stop segment fetcher to stop the sync and free resources
+ */
+ void
+ stop();
+
private:
/**
* @brief Get hello data from the producer
@@ -141,11 +148,10 @@
* m_onReceiveHelloData is called to let the application know
* so that it can set the subscription list using addSubscription
*
- * @param interest hello interest
- * @param data hello data
+ * @param bufferPtr hello data content
*/
void
- onHelloData(const ndn::Interest& interest, const ndn::Data& data);
+ onHelloData(const ndn::ConstBufferPtr& bufferPtr);
/**
* @brief Get hello data from the producer
@@ -155,23 +161,10 @@
* have the latest update for. We update our copy of producer's IBF with the latest one.
* Then we send another sync interest after a random jitter.
*
- * @param interest sync interest
- * @param data sync data
+ * @param bufferPtr sync data content
*/
void
- onSyncData(const ndn::Interest& interest, const ndn::Data& data);
-
- void
- onHelloTimeout(const ndn::Interest& interest);
-
- void
- onSyncTimeout(const ndn::Interest& interest);
-
- void
- onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack);
-
- void
- onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack);
+ onSyncData(const ndn::ConstBufferPtr& bufferPtr);
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
ndn::Face& m_face;
@@ -181,6 +174,9 @@
ndn::Name m_helloInterestPrefix;
ndn::Name m_syncInterestPrefix;
ndn::Name m_iblt;
+ ndn::Name m_helloDataName;
+ ndn::Name m_syncDataName;
+ uint32_t m_syncDataContentType;
ReceiveHelloCallback m_onReceiveHelloData;
@@ -197,10 +193,10 @@
std::map<ndn::Name, uint64_t> m_prefixes;
std::set<ndn::Name> m_subscriptionList;
- const ndn::PendingInterestId* m_outstandingInterestId;
-
std::mt19937 m_rng;
std::uniform_int_distribution<> m_rangeUniformRandom;
+ std::shared_ptr<ndn::util::SegmentFetcher> m_helloFetcher;
+ std::shared_ptr<ndn::util::SegmentFetcher> m_syncFetcher;
};
} // namespace psync
diff --git a/src/detail/state.cpp b/src/detail/state.cpp
index e5c6c47..d245c0b 100644
--- a/src/detail/state.cpp
+++ b/src/detail/state.cpp
@@ -71,29 +71,46 @@
void
State::wireDecode(const ndn::Block& wire)
{
- if (!wire.hasWire()) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("The supplied block does not contain wire format"));
+ auto blockType = wire.type();
+
+ if (blockType != tlv::PSyncContent && blockType != ndn::tlv::Content) {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Content/PSyncContent Block, but Block is of a different type: #" +
+ ndn::to_string(blockType)));
+ return;
}
wire.parse();
+
m_wire = wire;
auto it = m_wire.elements_begin();
- if (it->type() != tlv::PSyncContent) {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type when decoding Content: " +
- ndn::to_string(wire.type())));
+ if (it == m_wire.elements_end()) {
+ return;
}
- it->parse();
-
- for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
- if (val->type() == ndn::tlv::Name) {
- m_content.emplace_back(*val);
+ if (blockType == tlv::PSyncContent) {
+ while(it != m_wire.elements_end()) {
+ if (it->type() == ndn::tlv::Name) {
+ m_content.emplace_back(*it);
+ }
+ else {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
+ ndn::to_string(it->type())));
+ }
+ ++it;
}
- else {
- BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
- ndn::to_string(m_wire.type())));
+ }
+ else if (blockType == ndn::tlv::Content) {
+ it->parse();
+ for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
+ if (val->type() == ndn::tlv::Name) {
+ m_content.emplace_back(*val);
+ }
+ else {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
+ ndn::to_string(val->type())));
+ }
}
}
}
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
index dd4d452..c00bd0a 100644
--- a/src/full-producer.cpp
+++ b/src/full-producer.cpp
@@ -23,7 +23,6 @@
#include <ndn-cxx/util/segment-fetcher.hpp>
#include <ndn-cxx/security/validator-null.hpp>
-#include <iostream>
#include <cstring>
#include <limits>
#include <functional>
@@ -137,6 +136,11 @@
nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
onRecoveryInterest(interest);
}
+ // interest for recovery segment
+ else if (nameWithoutSyncPrefix.size() == 3 &&
+ nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 2) == RECOVERY_PREFIX.get(0)) {
+ onRecoveryInterest(interest);
+ }
else if (nameWithoutSyncPrefix.size() == 1) {
onSyncInterest(interest);
}
@@ -223,6 +227,10 @@
{
NDN_LOG_DEBUG("Recovery interest received");
+ if (m_segmentPublisher.replyFromStore(interest.getName())) {
+ return;
+ }
+
State state;
for (const auto& content : m_prefixes) {
if (content.second != 0) {
@@ -230,8 +238,8 @@
}
}
- // Send even if state is empty to let other side know that we are behind
- sendRecoveryData(interest.getName(), state);
+ m_segmentPublisher.publish(interest.getName(), interest.getName(),
+ state.wireEncode(), m_syncReplyFreshness);
}
void
@@ -391,44 +399,6 @@
}
void
-FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
-{
- ndn::EncodingBuffer buffer;
- buffer.prependBlock(state.wireEncode());
-
- const uint8_t* rawBuffer = buffer.buf();
- const uint8_t* segmentBegin = rawBuffer;
- const uint8_t* end = rawBuffer + buffer.size();
-
- uint64_t segmentNo = 0;
- do {
- const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
- if (segmentEnd > end) {
- segmentEnd = end;
- }
-
- ndn::Name segmentName(prefix);
- segmentName.appendSegment(segmentNo);
-
- std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
- data->setContent(segmentBegin, segmentEnd - segmentBegin);
- data->setFreshnessPeriod(m_syncReplyFreshness);
-
- segmentBegin = segmentEnd;
- if (segmentBegin >= end) {
- data->setFinalBlock(segmentName[-1]);
- }
-
- m_keyChain.sign(*data);
- m_face.put(*data);
-
- NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
-
- ++segmentNo;
- } while (segmentBegin < end);
-}
-
-void
FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
{
if (m_outstandingInterestId != nullptr) {
diff --git a/src/full-producer.hpp b/src/full-producer.hpp
index 2f97522..bcf699e 100644
--- a/src/full-producer.hpp
+++ b/src/full-producer.hpp
@@ -205,13 +205,6 @@
isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
/**
- * @brief Segment and send state with the given data name
- *
- */
- void
- sendRecoveryData(const ndn::Name& prefix, const State& state);
-
- /**
* @brief Send recovery interest using segment fetcher
*
* Recovery data is expected go over max packet size
diff --git a/src/partial-producer.cpp b/src/partial-producer.cpp
index d8d98a5..b0d29f1 100644
--- a/src/partial-producer.cpp
+++ b/src/partial-producer.cpp
@@ -22,7 +22,6 @@
#include <ndn-cxx/util/logger.hpp>
-#include <iostream>
#include <cstring>
#include <limits>
@@ -75,7 +74,18 @@
void
PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
{
- NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest.getNonce());
+ // Last component or third last component (in case of interest with IBF and segment)
+ // needs to be hello
+ if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
+ interest.getName().get(interest.getName().size()-3).toUri() != "hello") {
+ return;
+ }
+
+ if (m_segmentPublisher.replyFromStore(interest.getName())) {
+ return;
+ }
+
+ NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
State state;
@@ -87,13 +97,8 @@
ndn::Name helloDataName = prefix;
m_iblt.appendToName(helloDataName);
- ndn::Data data;
- data.setName(helloDataName);
- data.setFreshnessPeriod(m_helloReplyFreshness);
- data.setContent(state.wireEncode());
-
- m_keyChain.sign(data);
- m_face.put(data);
+ m_segmentPublisher.publish(interest.getName(), helloDataName,
+ state.wireEncode(), m_helloReplyFreshness);
}
void
@@ -104,6 +109,15 @@
ndn::Name interestName = interest.getName();
+ if (interestName.get(interestName.size() - 5).toUri() != "sync" &&
+ interestName.get(interestName.size() - 7).toUri() != "sync") {
+ return;
+ }
+
+ if (m_segmentPublisher.replyFromStore(interest.getName())) {
+ return;
+ }
+
ndn::name::Component bfName, ibltName;
unsigned int projectedCount;
double falsePositiveProb;
@@ -171,15 +185,9 @@
// send back data
ndn::Name syncDataName = interestName;
m_iblt.appendToName(syncDataName);
- ndn::Data data;
- data.setName(syncDataName);
- data.setFreshnessPeriod(m_syncReplyFreshness);
- data.setContent(state.wireEncode());
- m_keyChain.sign(data);
- NDN_LOG_DEBUG("Sending sync data");
- m_face.put(data);
-
+ m_segmentPublisher.publish(interest.getName(), syncDataName,
+ state.wireEncode(), m_syncReplyFreshness);
return;
}
@@ -222,8 +230,8 @@
State state;
if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
if (entry.bf.contains(prefix.toUri())) {
- state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
- NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
+ state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+ NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
}
else {
NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
@@ -232,14 +240,9 @@
// generate sync data and cancel the event
ndn::Name syncDataName = it->first;
m_iblt.appendToName(syncDataName);
- ndn::Data data;
- data.setName(syncDataName);
- data.setFreshnessPeriod(m_syncReplyFreshness);
- data.setContent(state.wireEncode());
- m_keyChain.sign(data);
- NDN_LOG_DEBUG("Sending sync data");
- m_face.put(data);
+ m_segmentPublisher.publish(it->first, syncDataName,
+ state.wireEncode(), m_syncReplyFreshness);
m_pendingEntries.erase(it++);
}
diff --git a/src/partial-producer.hpp b/src/partial-producer.hpp
index a2db7ac..cb014fc 100644
--- a/src/partial-producer.hpp
+++ b/src/partial-producer.hpp
@@ -112,7 +112,8 @@
* Either respond with sync data if consumer is behind or
* store sync interest in m_pendingEntries
*
- * Sync data's name format is: /\<sync-prefix\>/sync/\<old-IBF\>/\<current-IBF\>
+ * Sync data's name format is: /\<syncPrefix\>/sync/\<BF\>/\<old-IBF\>/\<current-IBF\>
+ * (BF has 3 components).
*/
void
onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest);
diff --git a/src/producer-base.cpp b/src/producer-base.cpp
index e24fb5b..e731168 100644
--- a/src/producer-base.cpp
+++ b/src/producer-base.cpp
@@ -45,6 +45,7 @@
, m_userPrefix(userPrefix)
, m_syncReplyFreshness(syncReplyFreshness)
, m_helloReplyFreshness(helloReplyFreshness)
+ , m_segmentPublisher(m_face, m_keyChain)
{
addUserNode(userPrefix);
}
@@ -129,9 +130,11 @@
ndn::Name dataName(name);
m_iblt.appendToName(dataName);
+ dataName.appendSegment(0);
ndn::Data data(dataName);
data.setFreshnessPeriod(m_syncReplyFreshness);
data.setContentType(ndn::tlv::ContentType_Nack);
+ data.setFinalBlock(dataName[-1]);
m_keyChain.sign(data);
m_face.put(data);
}
diff --git a/src/producer-base.hpp b/src/producer-base.hpp
index 079b803..4aa0ef7 100644
--- a/src/producer-base.hpp
+++ b/src/producer-base.hpp
@@ -24,6 +24,7 @@
#include "detail/bloom-filter.hpp"
#include "detail/util.hpp"
#include "detail/test-access-control.hpp"
+#include "segment-publisher.hpp"
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/util/scheduler.hpp>
@@ -180,6 +181,8 @@
ndn::time::milliseconds m_syncReplyFreshness;
ndn::time::milliseconds m_helloReplyFreshness;
+ SegmentPublisher m_segmentPublisher;
+
std::mt19937 m_rng;
};
diff --git a/src/segment-publisher.cpp b/src/segment-publisher.cpp
new file mode 100644
index 0000000..d78d47f
--- /dev/null
+++ b/src/segment-publisher.cpp
@@ -0,0 +1,103 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "segment-publisher.hpp"
+
+#include <ndn-cxx/name-component.hpp>
+
+namespace psync {
+
+SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
+ size_t imsLimit)
+ : m_face(face)
+ , m_scheduler(m_face.getIoService())
+ , m_keyChain(keyChain)
+ , m_ims(imsLimit)
+{
+}
+
+void
+SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName,
+ const ndn::Block& block, ndn::time::milliseconds freshness)
+{
+ uint64_t interestSegment = 0;
+ if (interestName[-1].isSegment()) {
+ interestSegment = interestName[-1].toSegment();
+ }
+
+ ndn::EncodingBuffer buffer;
+ buffer.prependBlock(std::move(block));
+
+ const uint8_t* rawBuffer = buffer.buf();
+ const uint8_t* segmentBegin = rawBuffer;
+ const uint8_t* end = rawBuffer + buffer.size();
+
+ size_t maxPacketSize = (ndn::MAX_NDN_PACKET_SIZE >> 1);
+
+ uint64_t totalSegments = buffer.size() / maxPacketSize;
+
+ uint64_t segmentNo = 0;
+ do {
+ const uint8_t* segmentEnd = segmentBegin + maxPacketSize;
+ if (segmentEnd > end) {
+ segmentEnd = end;
+ }
+
+ ndn::Name segmentName(dataName);
+ segmentName.appendSegment(segmentNo);
+
+ // We get a std::exception: bad_weak_ptr from m_ims if we don't use shared_ptr for data
+ std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+ data->setContent(segmentBegin, segmentEnd - segmentBegin);
+ data->setFreshnessPeriod(freshness);
+ data->setFinalBlock(ndn::name::Component::fromSegment(totalSegments));
+
+ segmentBegin = segmentEnd;
+
+ m_keyChain.sign(*data);
+
+ // Put on face only the segment which has a pending interest
+ // otherwise the segment is unsolicited
+ if (interestSegment == segmentNo) {
+ m_face.put(*data);
+ }
+
+ m_ims.insert(*data, freshness);
+ m_scheduler.scheduleEvent(freshness,
+ [this, segmentName] {
+ m_ims.erase(segmentName);
+ });
+
+ ++segmentNo;
+ } while (segmentBegin < end);
+}
+
+bool
+SegmentPublisher::replyFromStore(const ndn::Name& interestName)
+{
+ auto it = m_ims.find(interestName);
+
+ if (it != nullptr) {
+ m_face.put(*it);
+ return true;
+ }
+ return false;
+}
+
+} // namespace psync
diff --git a/src/segment-publisher.hpp b/src/segment-publisher.hpp
new file mode 100644
index 0000000..b23ac52
--- /dev/null
+++ b/src/segment-publisher.hpp
@@ -0,0 +1,78 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_SEGMENT_PUBLISHER_HPP
+#define PSYNC_SEGMENT_PUBLISHER_HPP
+
+#include "detail/test-access-control.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
+
+namespace psync {
+
+const int MAX_SEGMENTS_STORED = 100;
+
+/**
+ * @brief Segment Publisher to publish segmented data
+ *
+ */
+class SegmentPublisher
+{
+public:
+ SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
+ size_t imsLimit = MAX_SEGMENTS_STORED);
+
+ /**
+ * @brief Put all the segments in memory.
+ *
+ * @param interestName the interest name, to determine the sequence to be answered immediately
+ * @param dataName the data name, has components after interest name
+ * @param block the content of the data
+ * @param freshness freshness of the segments
+ */
+ void
+ publish(const ndn::Name& interestName, const ndn::Name& dataName,
+ const ndn::Block& block, ndn::time::milliseconds freshness);
+
+ /**
+ * @brief Try to reply from memory, return false if we cannot find the segment.
+ *
+ * The caller is then expected to use publish if this returns false.
+ */
+ bool
+ replyFromStore(const ndn::Name& interestName);
+
+private:
+ ndn::Face& m_face;
+ ndn::util::Scheduler m_scheduler;
+ ndn::KeyChain& m_keyChain;
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ ndn::InMemoryStorageFifo m_ims;
+};
+
+} // namespace psync
+
+#endif // PSYNC_SEGMENT_PUBLISHER_HPP
\ No newline at end of file
diff --git a/tests/test-consumer.cpp b/tests/test-consumer.cpp
index 97adde3..9a745e6 100644
--- a/tests/test-consumer.cpp
+++ b/tests/test-consumer.cpp
@@ -23,8 +23,6 @@
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/util/dummy-client-face.hpp>
-#include <iostream>
-
namespace psync {
using namespace ndn;
diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp
index 23d2ab5..ee6764e 100644
--- a/tests/test-full-producer.cpp
+++ b/tests/test-full-producer.cpp
@@ -24,8 +24,6 @@
#include <ndn-cxx/util/dummy-client-face.hpp>
#include <ndn-cxx/mgmt/nfd/control-parameters.hpp>
-#include <iostream>
-
namespace psync {
using namespace ndn;
diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp
index 3553c86..01a5375 100644
--- a/tests/test-full-sync.cpp
+++ b/tests/test-full-sync.cpp
@@ -26,8 +26,6 @@
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/util/dummy-client-face.hpp>
-#include <iostream>
-
namespace psync {
using namespace ndn;
@@ -332,35 +330,7 @@
{
addNode(0);
addNode(1);
-
- // Simple content store
- faces[0]->onSendInterest.connect([this] (const Interest& interest) {
- for (const auto& data : faces[1]->sentData) {
- if (data.getName() == interest.getName()) {
- faces[0]->receive(data);
- return;
- }
- }
- faces[1]->receive(interest);
- });
-
- faces[0]->onSendData.connect([this] (const Data& data) {
- faces[1]->receive(data);
- });
-
- faces[1]->onSendInterest.connect([this] (const Interest& interest) {
- for (const auto& data : faces[0]->sentData) {
- if (data.getName() == interest.getName()) {
- faces[1]->receive(data);
- return;
- }
- }
- faces[0]->receive(interest);
- });
-
- faces[1]->onSendData.connect([this] (const Data& data) {
- faces[0]->receive(data);
- });
+ faces[0]->linkTo(*faces[1]);
advanceClocks(ndn::time::milliseconds(10));
@@ -416,7 +386,6 @@
nodes[0]->publishName(Name("userNode0-" + to_string(totalUpdates)));
advanceClocks(ndn::time::milliseconds(10), 100);
- // No mechanism to recover yet
for (int i = 0; i <= totalUpdates; i++) {
Name userPrefix("userNode0-" + to_string(i));
for (int j = 0; j < 4; j++) {
diff --git a/tests/test-partial-producer.cpp b/tests/test-partial-producer.cpp
index 3bed1ac..2ff1a5d 100644
--- a/tests/test-partial-producer.cpp
+++ b/tests/test-partial-producer.cpp
@@ -24,8 +24,6 @@
#include <ndn-cxx/util/dummy-client-face.hpp>
#include <ndn-cxx/mgmt/nfd/control-parameters.hpp>
-#include <iostream>
-
namespace psync {
using namespace ndn;
diff --git a/tests/test-partial-sync.cpp b/tests/test-partial-sync.cpp
index 8061194..6d07644 100644
--- a/tests/test-partial-sync.cpp
+++ b/tests/test-partial-sync.cpp
@@ -25,8 +25,6 @@
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/util/dummy-client-face.hpp>
-#include <iostream>
-
namespace psync {
using namespace ndn;
@@ -46,12 +44,23 @@
addUserNodes("testUser", 10);
}
+ ~PartialSyncFixture()
+ {
+ for (auto consumer : consumers) {
+ if (consumer) {
+ consumer->stop();
+ }
+ }
+ }
+
void
- addConsumer(int id, const vector<string>& subscribeTo)
+ addConsumer(int id, const vector<string>& subscribeTo, bool linkToProducer = true)
{
consumerFaces[id] = make_shared<util::DummyClientFace>(io, util::DummyClientFace::Options{true, true});
- face.linkTo(*consumerFaces[id]);
+ if (linkToProducer) {
+ face.linkTo(*consumerFaces[id]);
+ }
consumers[id] = make_shared<Consumer>(syncPrefix, *consumerFaces[id],
[&, id] (const vector<Name>& availableSubs)
@@ -174,7 +183,7 @@
BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
// Next sync interest will bring back the sync data
- advanceClocks(ndn::time::milliseconds(1000));
+ advanceClocks(ndn::time::milliseconds(1500));
BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
}
@@ -343,6 +352,48 @@
BOOST_CHECK_EQUAL(numSyncDataRcvd, 3);
}
+BOOST_AUTO_TEST_CASE(SegmentedHello)
+{
+ vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+ addConsumer(0, subscribeTo);
+
+ addUserNodes("testUser", 400);
+
+ consumers[0]->sendHelloInterest();
+ advanceClocks(ndn::time::milliseconds(10));
+ BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+}
+
+BOOST_AUTO_TEST_CASE(SegmentedSync)
+{
+ ndn::Name longNameToExceedDataSize;
+ for (int i = 0; i < 100; i++) {
+ longNameToExceedDataSize.append("test-" + std::to_string(i));
+ }
+ addUserNodes(longNameToExceedDataSize.toUri(), 10);
+
+ vector<string> subscribeTo;
+ for (int i = 1; i < 10; i++) {
+ subscribeTo.push_back(longNameToExceedDataSize.toUri() + "-" + to_string(i));
+ }
+ addConsumer(0, subscribeTo);
+
+ consumers[0]->sendHelloInterest();
+ advanceClocks(ndn::time::milliseconds(10));
+ BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+ oldSeqMap = producer->m_prefixes;
+ for (int i = 1; i < 10; i++) {
+ producer->updateSeqNo(longNameToExceedDataSize.toUri() + "-" + to_string(i), 1);
+ }
+
+ advanceClocks(ndn::time::milliseconds(1000));
+ BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
+
+ advanceClocks(ndn::time::milliseconds(1500));
+ BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+}
+
BOOST_AUTO_TEST_SUITE_END()
} // namespace psync
\ No newline at end of file
diff --git a/tests/test-segment-publisher.cpp b/tests/test-segment-publisher.cpp
new file mode 100644
index 0000000..39881d8
--- /dev/null
+++ b/tests/test-segment-publisher.cpp
@@ -0,0 +1,162 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "segment-publisher.hpp"
+#include "detail/state.hpp"
+#include "unit-test-time-fixture.hpp"
+
+#include <boost/test/unit_test.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/util/dummy-client-face.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
+
+namespace psync {
+
+using namespace ndn;
+
+class SegmentPublisherFixture : public tests::UnitTestTimeFixture
+{
+public:
+ SegmentPublisherFixture()
+ : face(io, util::DummyClientFace::Options{true, true})
+ , publisher(face, keyChain)
+ , freshness(1000)
+ , numComplete(0)
+ , numRepliesFromStore(0)
+ {
+ face.setInterestFilter(InterestFilter("/hello/world"),
+ bind(&SegmentPublisherFixture::onInterest, this, _1, _2),
+ [] (const ndn::Name& prefix, const std::string& msg) {
+ BOOST_CHECK(false);
+ });
+ advanceClocks(ndn::time::milliseconds(10));
+
+ for (int i = 0; i < 1000; ++i) {
+ state.addContent(Name("/test").appendNumber(i));
+ }
+ }
+
+ ~SegmentPublisherFixture() {
+ fetcher->stop();
+ }
+
+ void
+ expressInterest(const Interest& interest) {
+ fetcher = util::SegmentFetcher::start(face, interest, ndn::security::v2::getAcceptAllValidator());
+ fetcher->onComplete.connect([this] (ConstBufferPtr data) {
+ numComplete++;
+ });
+ fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
+ BOOST_CHECK(false);
+ });
+
+ advanceClocks(ndn::time::milliseconds(10));
+ }
+
+ void
+ onInterest(const Name& prefix, const Interest& interest) {
+ if (publisher.replyFromStore(interest.getName())) {
+ numRepliesFromStore++;
+ return;
+ }
+
+ // If dataName is same as interest name
+ if (dataName.empty()) {
+ publisher.publish(interest.getName(), interest.getName(), state.wireEncode(), freshness);
+ }
+ else {
+ publisher.publish(interest.getName(), dataName, state.wireEncode(), freshness);
+ }
+ }
+
+ util::DummyClientFace face;
+ KeyChain keyChain;
+ SegmentPublisher publisher;
+ shared_ptr<util::SegmentFetcher> fetcher;
+ Name dataName;
+ time::milliseconds freshness;
+ State state;
+
+ int numComplete;
+ int numRepliesFromStore;
+};
+
+BOOST_FIXTURE_TEST_SUITE(TestSegmentPublisher, SegmentPublisherFixture)
+
+BOOST_AUTO_TEST_CASE(Basic)
+{
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+ expressInterest(Interest("/hello/world"));
+ BOOST_CHECK_EQUAL(numComplete, 1);
+ // First segment is answered directly in publish,
+ // Rest two are satisfied by the store
+ BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);
+
+ numRepliesFromStore = 0;
+ expressInterest(Interest("/hello/world"));
+ BOOST_CHECK_EQUAL(numComplete, 2);
+ BOOST_CHECK_EQUAL(numRepliesFromStore, 3);
+
+ advanceClocks(ndn::time::milliseconds(freshness));
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+
+ numRepliesFromStore = 0;
+ expressInterest(Interest("/hello/world"));
+ BOOST_CHECK_EQUAL(numComplete, 3);
+ BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+
+ numRepliesFromStore = 0;
+ face.expressInterest(Interest(Name("/hello/world/").appendSegment(0)),
+ [this] (const Interest& interest, const Data& data) {
+ numComplete++;
+ },
+ [] (const Interest& interest, const lp::Nack& nack) {
+ BOOST_CHECK(false);
+ },
+ [] (const Interest& interest) {
+ BOOST_CHECK(false);
+ });
+ advanceClocks(ndn::time::milliseconds(10));
+ BOOST_CHECK_EQUAL(numComplete, 4);
+ BOOST_CHECK_EQUAL(numRepliesFromStore, 1);
+}
+
+BOOST_AUTO_TEST_CASE(LargerDataName)
+{
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+ dataName = Name("/hello/world/IBF");
+
+ expressInterest(Interest("/hello/world"));
+ BOOST_CHECK_EQUAL(numComplete, 1);
+ // First segment is answered directly in publish,
+ // Rest two are satisfied by the store
+ BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);
+
+ advanceClocks(ndn::time::milliseconds(freshness));
+ BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace psync
\ No newline at end of file
diff --git a/tests/test-state.cpp b/tests/test-state.cpp
index 47c1471..0a815fd 100644
--- a/tests/test-state.cpp
+++ b/tests/test-state.cpp
@@ -29,7 +29,7 @@
BOOST_AUTO_TEST_SUITE(TestState)
-BOOST_AUTO_TEST_CASE(EncodeDeode)
+BOOST_AUTO_TEST_CASE(EncodeDecode)
{
State state;
state.addContent(ndn::Name("test1"));
@@ -40,6 +40,30 @@
State rcvdState(data.getContent());
BOOST_CHECK(state.getContent() == rcvdState.getContent());
+
+ // Simulate getting buffer content from segment fetcher
+ ndn::Buffer buffer(data.getContent().value_size());
+ std::copy(data.getContent().value_begin(),
+ data.getContent().value_end(),
+ buffer.begin());
+
+ ndn::ConstBufferPtr buffer2 = make_shared<ndn::Buffer>(buffer);
+
+ ndn::Block block(std::move(buffer2));
+
+ State state2;
+ state2.wireDecode(block);
+
+ BOOST_CHECK(state.getContent() == state2.getContent());
+}
+
+BOOST_AUTO_TEST_CASE(EmptyContent)
+{
+ ndn::Data data;
+ BOOST_CHECK_NO_THROW(State state(data.getContent()));
+
+ State state(data.getContent());
+ BOOST_CHECK_EQUAL(state.getContent().size(), 0);
}
BOOST_AUTO_TEST_SUITE_END()