PSync: initial commit
refs: #4641
Change-Id: Iabed3ad7632544d97559e6798547b7972b416784
diff --git a/src/consumer.cpp b/src/consumer.cpp
new file mode 100644
index 0000000..545acea
--- /dev/null
+++ b/src/consumer.cpp
@@ -0,0 +1,209 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "consumer.hpp"
+#include "detail/state.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+
+#include <boost/algorithm/string.hpp>
+
+namespace psync {
+
+NDN_LOG_INIT(psync.Consumer);
+
+Consumer::Consumer(const ndn::Name& syncPrefix,
+ ndn::Face& face,
+ const ReceiveHelloCallback& onReceiveHelloData,
+ const UpdateCallback& onUpdate,
+ unsigned int count,
+ double false_positive = 0.001,
+ ndn::time::milliseconds helloInterestLifetime,
+ ndn::time::milliseconds syncInterestLifetime)
+ : m_face(face)
+ , m_scheduler(m_face.getIoService())
+ , m_syncPrefix(syncPrefix)
+ , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append("hello"))
+ , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append("sync"))
+ , m_onReceiveHelloData(onReceiveHelloData)
+ , m_onUpdate(onUpdate)
+ , m_bloomFilter(count, false_positive)
+ , m_helloInterestLifetime(helloInterestLifetime)
+ , m_syncInterestLifetime(syncInterestLifetime)
+ , m_rng(std::random_device{}())
+ , m_rangeUniformRandom(100, 500)
+{
+}
+
+bool
+Consumer::addSubscription(const ndn::Name& prefix)
+{
+ auto it = m_prefixes.insert(std::pair<ndn::Name, uint64_t>(prefix, 0));
+ if (!it.second) {
+ return false;
+ }
+ m_subscriptionList.insert(prefix);
+ m_bloomFilter.insert(prefix.toUri());
+ return true;
+}
+
+void
+Consumer::sendHelloInterest()
+{
+ ndn::Interest helloInterest(m_helloInterestPrefix);
+ helloInterest.setInterestLifetime(m_helloInterestLifetime);
+ helloInterest.setCanBePrefix(true);
+ helloInterest.setMustBeFresh(true);
+
+ NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
+
+ m_face.expressInterest(helloInterest,
+ std::bind(&Consumer::onHelloData, this, _1, _2),
+ std::bind(&Consumer::onNackForHello, this, _1, _2),
+ std::bind(&Consumer::onHelloTimeout, this, _1));
+}
+
+void
+Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
+{
+ ndn::Name helloDataName = data.getName();
+
+ NDN_LOG_DEBUG("On Hello Data");
+
+ // Extract IBF from name which is the last element in hello data's name
+ m_iblt = helloDataName.getSubName(helloDataName.size()-1, 1);
+
+ NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
+
+ State state(data.getContent());
+
+ NDN_LOG_DEBUG("Content: " << state);
+
+ m_onReceiveHelloData(state.getContent());
+}
+
+void
+Consumer::sendSyncInterest()
+{
+ BOOST_ASSERT(!m_iblt.empty());
+
+ ndn::Name syncInterestName(m_syncInterestPrefix);
+
+ // Append subscription list
+ m_bloomFilter.appendToName(syncInterestName);
+
+ // Append IBF received in hello/sync data
+ syncInterestName.append(m_iblt);
+
+ ndn::Interest syncInterest(syncInterestName);
+ syncInterest.setInterestLifetime(m_syncInterestLifetime);
+ syncInterest.setCanBePrefix(true);
+ syncInterest.setMustBeFresh(true);
+
+ NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
+ " hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
+
+ // Remove last pending interest before sending a new one
+ if (m_outstandingInterestId != nullptr) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ m_outstandingInterestId = nullptr;
+ }
+
+ m_outstandingInterestId = m_face.expressInterest(syncInterest,
+ std::bind(&Consumer::onSyncData, this, _1, _2),
+ std::bind(&Consumer::onNackForSync, this, _1, _2),
+ std::bind(&Consumer::onSyncTimeout, this, _1));
+}
+
+void
+Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+{
+ ndn::Name syncDataName = data.getName();
+
+ // Extract IBF from sync data name which is the last component
+ m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
+
+ if (data.getContentType() == ndn::tlv::ContentType_Nack) {
+ NDN_LOG_DEBUG("Received application Nack from producer, renew sync interest");
+ sendSyncInterest();
+ return;
+ }
+
+ State state(data.getContent());
+ std::vector <MissingDataInfo> updates;
+
+ for (const auto& content : state.getContent()) {
+ NDN_LOG_DEBUG(content);
+ ndn::Name prefix = content.getPrefix(-1);
+ uint64_t seq = content.get(content.size()-1).toNumber();
+ if (m_prefixes.find(prefix) == m_prefixes.end() || seq > m_prefixes[prefix]) {
+ // If this is just the next seq number then we had already informed the consumer about
+ // the previous sequence number and hence seq low and seq high should be equal to current seq
+ updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
+ m_prefixes[prefix] = seq;
+ }
+ // Else updates will be empty and consumer will not be notified.
+ }
+
+ NDN_LOG_DEBUG("Sync Data: " << state);
+
+ if (!updates.empty()) {
+ m_onUpdate(updates);
+ }
+
+ sendSyncInterest();
+}
+
+void
+Consumer::onHelloTimeout(const ndn::Interest& interest)
+{
+ NDN_LOG_DEBUG("on hello timeout");
+ this->sendHelloInterest();
+}
+
+void
+Consumer::onSyncTimeout(const ndn::Interest& interest)
+{
+ NDN_LOG_DEBUG("on sync timeout " << interest.getNonce());
+
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+}
+
+void
+Consumer::onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack)
+{
+ NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
+ " for interest " << interest << std::endl);
+
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
+}
+
+void
+Consumer::onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack)
+{
+ NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
+ " for interest " << interest << std::endl);
+
+ ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+ m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+}
+
+} // namespace psync
diff --git a/src/consumer.hpp b/src/consumer.hpp
new file mode 100644
index 0000000..47825a9
--- /dev/null
+++ b/src/consumer.hpp
@@ -0,0 +1,201 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_CONSUMER_HPP
+#define PSYNC_CONSUMER_HPP
+
+#include "detail/bloom-filter.hpp"
+#include "detail/util.hpp"
+#include "detail/test-access-control.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/time.hpp>
+
+#include <random>
+#include <map>
+#include <vector>
+
+namespace psync {
+
+using namespace ndn::literals::time_literals;
+
+typedef std::function<void(const std::vector<ndn::Name>&)> ReceiveHelloCallback;
+typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
+
+const ndn::time::milliseconds HELLO_INTEREST_LIFETIME = 1_s;
+const ndn::time::milliseconds SYNC_INTEREST_LIFETIME = 1_s;
+
+/**
+ * @brief Consumer logic to subscribe to producer's data
+ *
+ * Application needs to call sendHelloInterest to get the subscription list
+ * in ReceiveHelloCallback. It can then add the desired names using addSubscription.
+ * Finally application will call sendSyncInterest. If the application adds something
+ * later to the subscription list then it may call sendSyncInterest again for
+ * sending the next sync interest with updated IBF immediately to reduce any delay in sync data.
+ * Whenever there is new data UpdateCallback will be called to notify the application.
+ * Currently, fetching of the data needs to be handled by the application.
+ */
+class Consumer
+{
+public:
+ /**
+ * @brief constructor
+ *
+ * @param syncPrefix syncPrefix to send hello/sync interests to producer
+ * @param face application's face
+ * @param onReceiveHelloData call back to give hello data back to application
+ * @param onUpdate call back to give sync data back to application
+ * @param count bloom filter number of expected elements (subscriptions) in bloom filter
+ * @param false_positive bloom filter false positive probability
+ * @param helloInterestLifetime lifetime of hello interest
+ * @param syncInterestLifetime lifetime of sync interest
+ */
+ Consumer(const ndn::Name& syncPrefix,
+ ndn::Face& face,
+ const ReceiveHelloCallback& onReceiveHelloData,
+ const UpdateCallback& onUpdate,
+ unsigned int count,
+ double false_positive,
+ ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
+ ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
+
+ /**
+ * @brief send hello interest /<sync-prefix>/hello/
+ *
+ * Should be called by the application whenever it wants to send a hello
+ */
+ void
+ sendHelloInterest();
+
+ /**
+ * @brief send sync interest /<sync-prefix>/sync/\<BF\>/\<producers-IBF\>
+ *
+ * Should be called after subscription list is set or updated
+ */
+ void
+ sendSyncInterest();
+
+ /**
+ * @brief Add prefix to subscription list
+ *
+ * @param prefix prefix to be added to the list
+ */
+ bool
+ addSubscription(const ndn::Name& prefix);
+
+ std::set<ndn::Name>
+ getSubscriptionList() const
+ {
+ return m_subscriptionList;
+ }
+
+ bool
+ isSubscribed(const ndn::Name& prefix) const
+ {
+ return m_subscriptionList.find(prefix) != m_subscriptionList.end();
+ }
+
+ ndn::optional<uint64_t>
+ getSeqNo(const ndn::Name& prefix) const
+ {
+ auto it = m_prefixes.find(prefix);
+ if (it == m_prefixes.end()) {
+ return ndn::nullopt;
+ }
+ return it->second;
+ }
+
+private:
+ /**
+ * @brief Get hello data from the producer
+ *
+ * Format: /<sync-prefix>/hello/\<BF\>/\<producer-IBF\>
+ * Data content is all the prefixes the producer has.
+ * We store the producer's IBF to be used in sending sync interest
+ *
+ * m_onReceiveHelloData is called to let the application know
+ * so that it can set the subscription list using addSubscription
+ *
+ * @param interest hello interest
+ * @param data hello data
+ */
+ void
+ onHelloData(const ndn::Interest& interest, const ndn::Data& data);
+
+ /**
+ * @brief Get hello data from the producer
+ *
+ * Format: <sync-prefix>/sync/\<BF\>/\<producers-IBF\>/\<producers-latest-IBF\>
+ * Data content is all the prefixes the producer thinks the consumer doesn't have
+ * have the latest update for. We update our copy of producer's IBF with the latest one.
+ * Then we send another sync interest after a random jitter.
+ *
+ * @param interest sync interest
+ * @param data sync data
+ */
+ void
+ onSyncData(const ndn::Interest& interest, const ndn::Data& data);
+
+ void
+ onHelloTimeout(const ndn::Interest& interest);
+
+ void
+ onSyncTimeout(const ndn::Interest& interest);
+
+ void
+ onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack);
+
+ void
+ onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack);
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ ndn::Face& m_face;
+ ndn::Scheduler m_scheduler;
+
+ ndn::Name m_syncPrefix;
+ ndn::Name m_helloInterestPrefix;
+ ndn::Name m_syncInterestPrefix;
+ ndn::Name m_iblt;
+
+ ReceiveHelloCallback m_onReceiveHelloData;
+
+ // Called when new sync update is received from producer.
+ UpdateCallback m_onUpdate;
+
+ // Bloom filter is used to store application/user's subscription list.
+ BloomFilter m_bloomFilter;
+
+ ndn::time::milliseconds m_helloInterestLifetime;
+ ndn::time::milliseconds m_syncInterestLifetime;
+
+ // Store sequence number for the prefix.
+ std::map<ndn::Name, uint64_t> m_prefixes;
+ std::set<ndn::Name> m_subscriptionList;
+
+ const ndn::PendingInterestId* m_outstandingInterestId;
+
+ std::mt19937 m_rng;
+ std::uniform_int_distribution<> m_rangeUniformRandom;
+};
+
+} // namespace psync
+
+#endif // PSYNC_CONSUMER_HPP
diff --git a/src/detail/bloom-filter.cpp b/src/detail/bloom-filter.cpp
new file mode 100644
index 0000000..b74a00b
--- /dev/null
+++ b/src/detail/bloom-filter.cpp
@@ -0,0 +1,349 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+
+ * This file incorporates work covered by the following copyright and
+ * permission notice:
+
+ * The MIT License (MIT)
+
+ * Copyright (c) 2000 Arash Partow
+
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+*/
+
+#include "bloom-filter.hpp"
+#include "util.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+
+#include <algorithm>
+#include <cmath>
+#include <cstddef>
+#include <iterator>
+#include <limits>
+#include <cstdlib>
+
+// https://github.com/ArashPartow/bloom
+
+NDN_LOG_INIT(psync.BloomFilter);
+
+namespace psync {
+
+static const std::size_t bits_per_char = 0x08;
+static const unsigned char bit_mask[bits_per_char] = {
+ 0x01, //00000001
+ 0x02, //00000010
+ 0x04, //00000100
+ 0x08, //00001000
+ 0x10, //00010000
+ 0x20, //00100000
+ 0x40, //01000000
+ 0x80 //10000000
+};
+
+BloomParameters::BloomParameters()
+: minimum_size(1)
+, maximum_size(std::numeric_limits<unsigned int>::max())
+, minimum_number_of_hashes(1)
+, maximum_number_of_hashes(std::numeric_limits<unsigned int>::max())
+, projected_element_count(200)
+, false_positive_probability(1.0 / projected_element_count)
+, random_seed(0xA5A5A5A55A5A5A5AULL)
+{}
+
+bool
+BloomParameters::compute_optimal_parameters()
+{
+ if (!(*this)) {
+ return false;
+ }
+
+ double min_m = std::numeric_limits<double>::infinity();
+ double min_k = 0.0;
+ double curr_m = 0.0;
+ double k = 1.0;
+
+ while (k < 1000.0)
+ {
+ double numerator = (- k * projected_element_count);
+ double denominator = std::log(1.0 - std::pow(false_positive_probability, 1.0 / k));
+ curr_m = numerator / denominator;
+ if (curr_m < min_m)
+ {
+ min_m = curr_m;
+ min_k = k;
+ }
+ k += 1.0;
+ }
+
+ optimal_parameters_t& optp = optimal_parameters;
+
+ optp.number_of_hashes = static_cast<unsigned int>(min_k);
+ optp.table_size = static_cast<unsigned int>(min_m);
+ optp.table_size += (((optp.table_size % bits_per_char) != 0) ? (bits_per_char - (optp.table_size % bits_per_char)) : 0);
+
+ if (optp.number_of_hashes < minimum_number_of_hashes)
+ optp.number_of_hashes = minimum_number_of_hashes;
+ else if (optp.number_of_hashes > maximum_number_of_hashes)
+ optp.number_of_hashes = maximum_number_of_hashes;
+
+ if (optp.table_size < minimum_size)
+ optp.table_size = minimum_size;
+ else if (optp.table_size > maximum_size)
+ optp.table_size = maximum_size;
+
+ return true;
+}
+
+BloomFilter::BloomFilter()
+ : bit_table_(0)
+ , salt_count_(0)
+ , table_size_(0)
+ , raw_table_size_(0)
+ , projected_element_count_(0)
+ , inserted_element_count_(0)
+ , random_seed_(0)
+ , desired_false_positive_probability_(0.0)
+{}
+
+BloomFilter::BloomFilter(const BloomParameters& p)
+ : bit_table_(0)
+ , projected_element_count_(p.projected_element_count)
+ , inserted_element_count_(0)
+ , random_seed_((p.random_seed * 0xA5A5A5A5) + 1)
+ , desired_false_positive_probability_(p.false_positive_probability)
+{
+ salt_count_ = p.optimal_parameters.number_of_hashes;
+ table_size_ = p.optimal_parameters.table_size;
+ generate_unique_salt();
+ raw_table_size_ = table_size_ / bits_per_char;
+ //bit_table_ = new cell_type[static_cast<std::size_t>(raw_table_size_)];
+ bit_table_.resize(static_cast<std::size_t>(raw_table_size_), 0x00);
+}
+
+BloomFilter::BloomFilter(unsigned int projected_element_count,
+ double false_positive_probability)
+ : BloomFilter(getParameters(projected_element_count, false_positive_probability))
+{
+}
+
+BloomFilter::BloomFilter(unsigned int projected_element_count,
+ double false_positive_probability,
+ const ndn::name::Component& bfName)
+ : BloomFilter(projected_element_count, false_positive_probability)
+{
+ std::vector<BloomFilter::cell_type> table(bfName.value_begin(), bfName.value_end());
+
+ if (table.size() != raw_table_size_) {
+ BOOST_THROW_EXCEPTION(Error("Received BloomFilter cannot be decoded!"));
+ }
+ bit_table_ = table;
+}
+
+BloomParameters
+BloomFilter::getParameters(unsigned int projected_element_count,
+ double false_positive_probability)
+{
+ BloomParameters opt;
+ opt.false_positive_probability = false_positive_probability;
+ opt.projected_element_count = projected_element_count;
+
+ if (!opt) {
+ NDN_LOG_WARN("Bloom parameters are not correct!");
+ }
+
+ opt.compute_optimal_parameters();
+ return opt;
+}
+
+void
+BloomFilter::appendToName(ndn::Name& name) const
+{
+ name.appendNumber(projected_element_count_);
+ name.appendNumber((int)(desired_false_positive_probability_ * 1000));
+ name.append(bit_table_.begin(), bit_table_.end());
+}
+
+void
+BloomFilter::clear()
+{
+ bit_table_.resize(static_cast<std::size_t>(raw_table_size_), 0x00);
+ inserted_element_count_ = 0;
+}
+
+void
+BloomFilter::insert(const std::string& key)
+{
+ std::size_t bit_index = 0;
+ std::size_t bit = 0;
+ for (std::size_t i = 0; i < salt_.size(); ++i)
+ {
+ compute_indices(murmurHash3(salt_[i], key), bit_index, bit);
+ bit_table_[bit_index/bits_per_char] |= bit_mask[bit];
+ }
+ ++inserted_element_count_;
+}
+
+bool
+BloomFilter::contains(const std::string& key) const
+{
+ std::size_t bit_index = 0;
+ std::size_t bit = 0;
+
+ for (std::size_t i = 0; i < salt_.size(); ++i)
+ {
+ compute_indices(murmurHash3(salt_[i], key), bit_index, bit);
+ if ((bit_table_[bit_index/bits_per_char] & bit_mask[bit]) != bit_mask[bit]) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+std::vector <BloomFilter::cell_type>
+BloomFilter::table() const
+{
+ return bit_table_;
+}
+
+void
+BloomFilter::generate_unique_salt()
+{
+ /*
+ Note:
+ A distinct hash function need not be implementation-wise
+ distinct. In the current implementation "seeding" a common
+ hash function with different values seems to be adequate.
+ */
+ const unsigned int predef_salt_count = 128;
+ static const bloom_type predef_salt[predef_salt_count] =
+ {
+ 0xAAAAAAAA, 0x55555555, 0x33333333, 0xCCCCCCCC,
+ 0x66666666, 0x99999999, 0xB5B5B5B5, 0x4B4B4B4B,
+ 0xAA55AA55, 0x55335533, 0x33CC33CC, 0xCC66CC66,
+ 0x66996699, 0x99B599B5, 0xB54BB54B, 0x4BAA4BAA,
+ 0xAA33AA33, 0x55CC55CC, 0x33663366, 0xCC99CC99,
+ 0x66B566B5, 0x994B994B, 0xB5AAB5AA, 0xAAAAAA33,
+ 0x555555CC, 0x33333366, 0xCCCCCC99, 0x666666B5,
+ 0x9999994B, 0xB5B5B5AA, 0xFFFFFFFF, 0xFFFF0000,
+ 0xB823D5EB, 0xC1191CDF, 0xF623AEB3, 0xDB58499F,
+ 0xC8D42E70, 0xB173F616, 0xA91A5967, 0xDA427D63,
+ 0xB1E8A2EA, 0xF6C0D155, 0x4909FEA3, 0xA68CC6A7,
+ 0xC395E782, 0xA26057EB, 0x0CD5DA28, 0x467C5492,
+ 0xF15E6982, 0x61C6FAD3, 0x9615E352, 0x6E9E355A,
+ 0x689B563E, 0x0C9831A8, 0x6753C18B, 0xA622689B,
+ 0x8CA63C47, 0x42CC2884, 0x8E89919B, 0x6EDBD7D3,
+ 0x15B6796C, 0x1D6FDFE4, 0x63FF9092, 0xE7401432,
+ 0xEFFE9412, 0xAEAEDF79, 0x9F245A31, 0x83C136FC,
+ 0xC3DA4A8C, 0xA5112C8C, 0x5271F491, 0x9A948DAB,
+ 0xCEE59A8D, 0xB5F525AB, 0x59D13217, 0x24E7C331,
+ 0x697C2103, 0x84B0A460, 0x86156DA9, 0xAEF2AC68,
+ 0x23243DA5, 0x3F649643, 0x5FA495A8, 0x67710DF8,
+ 0x9A6C499E, 0xDCFB0227, 0x46A43433, 0x1832B07A,
+ 0xC46AFF3C, 0xB9C8FFF0, 0xC9500467, 0x34431BDF,
+ 0xB652432B, 0xE367F12B, 0x427F4C1B, 0x224C006E,
+ 0x2E7E5A89, 0x96F99AA5, 0x0BEB452A, 0x2FD87C39,
+ 0x74B2E1FB, 0x222EFD24, 0xF357F60C, 0x440FCB1E,
+ 0x8BBE030F, 0x6704DC29, 0x1144D12F, 0x948B1355,
+ 0x6D8FD7E9, 0x1C11A014, 0xADD1592F, 0xFB3C712E,
+ 0xFC77642F, 0xF9C4CE8C, 0x31312FB9, 0x08B0DD79,
+ 0x318FA6E7, 0xC040D23D, 0xC0589AA7, 0x0CA5C075,
+ 0xF874B172, 0x0CF914D5, 0x784D3280, 0x4E8CFEBC,
+ 0xC569F575, 0xCDB2A091, 0x2CC016B4, 0x5C5F4421
+ };
+
+ if (salt_count_ <= predef_salt_count)
+ {
+ std::copy(predef_salt,
+ predef_salt + salt_count_,
+ std::back_inserter(salt_));
+ for (unsigned int i = 0; i < salt_.size(); ++i)
+ {
+ salt_[i] = salt_[i] * salt_[(i + 3) % salt_.size()] + static_cast<bloom_type>(random_seed_);
+ }
+ }
+ else
+ {
+ std::copy(predef_salt,predef_salt + predef_salt_count,std::back_inserter(salt_));
+ srand(static_cast<unsigned int>(random_seed_));
+ while (salt_.size() < salt_count_)
+ {
+ bloom_type current_salt = static_cast<bloom_type>(rand()) * static_cast<bloom_type>(rand());
+ if (0 == current_salt) continue;
+ if (salt_.end() == std::find(salt_.begin(), salt_.end(), current_salt))
+ {
+ salt_.push_back(current_salt);
+ }
+ }
+ }
+}
+
+void
+BloomFilter::compute_indices(const bloom_type& hash,
+ std::size_t& bit_index, std::size_t& bit) const
+{
+ bit_index = hash % table_size_;
+ bit = bit_index % bits_per_char;
+}
+
+bool
+operator==(const BloomFilter& bf1, const BloomFilter& bf2)
+{
+ auto table1 = bf1.table();
+ auto table2 = bf2.table();
+
+ if (table1.size() != table2.size()) {
+ return false;
+ }
+
+ for (size_t i = 0; i < table1.size(); i++) {
+ if (table1[i] != table2[i]) {
+ return false;
+ }
+ }
+ return true;
+}
+
+std::ostream&
+operator<<(std::ostream& out, const BloomFilter& bf)
+{
+ for (const auto& element : bf.table()) {
+ out << unsigned(element);
+ }
+ return out;
+}
+
+} // namespace psync
\ No newline at end of file
diff --git a/src/detail/bloom-filter.hpp b/src/detail/bloom-filter.hpp
new file mode 100644
index 0000000..319f1c1
--- /dev/null
+++ b/src/detail/bloom-filter.hpp
@@ -0,0 +1,180 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+
+ * This file incorporates work covered by the following copyright and
+ * permission notice:
+
+ * The MIT License (MIT)
+
+ * Copyright (c) 2000 Arash Partow
+
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+*/
+
+#ifndef PSYNC_BLOOM_FILTER_HPP
+#define PSYNC_BLOOM_FILTER_HPP
+
+#include <ndn-cxx/name.hpp>
+
+#include <string>
+#include <vector>
+#include <cmath>
+#include <cstdlib>
+
+namespace psync {
+
+struct optimal_parameters_t
+{
+ optimal_parameters_t()
+ : number_of_hashes(0),
+ table_size(0)
+ {}
+
+ unsigned int number_of_hashes;
+ unsigned int table_size;
+};
+
+class BloomParameters
+{
+public:
+ BloomParameters();
+
+ bool
+ compute_optimal_parameters();
+
+ bool operator!() const
+ {
+ return (minimum_size > maximum_size) ||
+ (minimum_number_of_hashes > maximum_number_of_hashes) ||
+ (minimum_number_of_hashes < 1) ||
+ (0 == maximum_number_of_hashes) ||
+ (0 == projected_element_count) ||
+ (false_positive_probability < 0.0) ||
+ (std::numeric_limits<double>::infinity() == std::abs(false_positive_probability)) ||
+ (0 == random_seed) ||
+ (0xFFFFFFFFFFFFFFFFULL == random_seed);
+ }
+
+ unsigned int minimum_size;
+ unsigned int maximum_size;
+ unsigned int minimum_number_of_hashes;
+ unsigned int maximum_number_of_hashes;
+ unsigned int projected_element_count;
+ double false_positive_probability;
+ unsigned long long int random_seed;
+ optimal_parameters_t optimal_parameters;
+};
+
+class BloomFilter
+{
+protected:
+ typedef uint32_t bloom_type;
+ typedef uint8_t cell_type;
+ typedef std::vector <cell_type>::iterator Iterator;
+
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ using std::runtime_error::runtime_error;
+ };
+
+ BloomFilter();
+
+ explicit BloomFilter(const BloomParameters& p);
+
+ BloomFilter(unsigned int projected_element_count,
+ double false_positive_probability);
+
+ BloomFilter(unsigned int projected_element_count,
+ double false_positive_probability,
+ const ndn::name::Component& bfName);
+
+ BloomParameters
+ getParameters(unsigned int projected_element_count,
+ double false_positive_probability);
+
+ /**
+ * @brief Append our bloom filter to the given name
+ *
+ * Append the count and false positive probability
+ * along with the bloom filter so that producer (PartialProducer) can construct a copy.
+ *
+ * @param name append bloom filter to this name
+ */
+ void
+ appendToName(ndn::Name& name) const;
+
+ void
+ clear();
+
+ void
+ insert(const std::string& key);
+
+ bool
+ contains(const std::string& key) const;
+
+ std::vector<cell_type>
+ table() const;
+
+private:
+ void
+ generate_unique_salt();
+
+ void
+ compute_indices(const bloom_type& hash,
+ std::size_t& bit_index, std::size_t& bit) const;
+
+private:
+ std::vector <bloom_type> salt_;
+ std::vector <cell_type> bit_table_;
+ unsigned int salt_count_;
+ unsigned int table_size_; // 8 * raw_table_size;
+ unsigned int raw_table_size_;
+ unsigned int projected_element_count_;
+ unsigned int inserted_element_count_;
+ unsigned long long int random_seed_;
+ double desired_false_positive_probability_;
+};
+
+bool
+operator==(const BloomFilter& bf1, const BloomFilter& bf2);
+
+std::ostream&
+operator<<(std::ostream& out, const BloomFilter& bf);
+
+} // namespace psync
+
+#endif // PSYNC_BLOOM_FILTER_HPP
\ No newline at end of file
diff --git a/src/detail/iblt.cpp b/src/detail/iblt.cpp
new file mode 100644
index 0000000..495c232
--- /dev/null
+++ b/src/detail/iblt.cpp
@@ -0,0 +1,277 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+
+ * This file incorporates work covered by the following copyright and
+ * permission notice:
+
+ * The MIT License (MIT)
+
+ * Copyright (c) 2014 Gavin Andresen
+
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+*/
+
+#include "iblt.hpp"
+#include "util.hpp"
+
+#include <sstream>
+
+namespace psync {
+
+const size_t N_HASH(3);
+const size_t N_HASHCHECK(11);
+
+bool
+HashTableEntry::isPure() const
+{
+ if (count == 1 || count == -1) {
+ uint32_t check = murmurHash3(N_HASHCHECK, keySum);
+ return keyCheck == check;
+ }
+
+ return false;
+}
+
+bool
+HashTableEntry::isEmpty() const
+{
+ return count == 0 && keySum == 0 && keyCheck == 0;
+}
+
+IBLT::IBLT(size_t expectedNumEntries)
+{
+ // 1.5x expectedNumEntries gives very low probability of decoding failure
+ size_t nEntries = expectedNumEntries + expectedNumEntries / 2;
+ // make nEntries exactly divisible by N_HASH
+ size_t remainder = nEntries % N_HASH;
+ if (remainder != 0) {
+ nEntries += (N_HASH - remainder);
+ }
+
+ m_hashTable.resize(nEntries);
+}
+
+void
+IBLT::initialize(const ndn::name::Component& ibltName)
+{
+ const auto& values = extractValueFromName(ibltName);
+
+ if (3 * m_hashTable.size() != values.size()) {
+ BOOST_THROW_EXCEPTION(Error("Received IBF cannot be decoded!"));
+ }
+
+ for (size_t i = 0; i < m_hashTable.size(); i++) {
+ HashTableEntry& entry = m_hashTable.at(i);
+ if (values[i * 3] != 0) {
+ entry.count = values[i * 3];
+ entry.keySum = values[(i * 3) + 1];
+ entry.keyCheck = values[(i * 3) + 2];
+ }
+ }
+}
+
+void
+IBLT::update(int plusOrMinus, uint32_t key)
+{
+ size_t bucketsPerHash = m_hashTable.size() / N_HASH;
+
+ for (size_t i = 0; i < N_HASH; i++) {
+ size_t startEntry = i * bucketsPerHash;
+ uint32_t h = murmurHash3(i, key);
+ HashTableEntry& entry = m_hashTable.at(startEntry + (h % bucketsPerHash));
+ entry.count += plusOrMinus;
+ entry.keySum ^= key;
+ entry.keyCheck ^= murmurHash3(N_HASHCHECK, key);
+ }
+}
+
+void
+IBLT::insert(uint32_t key)
+{
+ update(INSERT, key);
+}
+
+void
+IBLT::erase(uint32_t key)
+{
+ update(ERASE, key);
+}
+
+bool
+IBLT::listEntries(std::set<uint32_t>& positive, std::set<uint32_t>& negative) const
+{
+ IBLT peeled = *this;
+
+ size_t nErased = 0;
+ do {
+ nErased = 0;
+ for (const auto& entry : peeled.m_hashTable) {
+ if (entry.isPure()) {
+ if (entry.count == 1) {
+ positive.insert(entry.keySum);
+ }
+ else {
+ negative.insert(entry.keySum);
+ }
+ peeled.update(-entry.count, entry.keySum);
+ ++nErased;
+ }
+ }
+ } while (nErased > 0);
+
+ // If any buckets for one of the hash functions is not empty,
+ // then we didn't peel them all:
+ for (const auto& entry : peeled.m_hashTable) {
+ if (entry.isEmpty() != true) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+IBLT
+IBLT::operator-(const IBLT& other) const
+{
+ BOOST_ASSERT(m_hashTable.size() == other.m_hashTable.size());
+
+ IBLT result(*this);
+ for (size_t i = 0; i < m_hashTable.size(); i++) {
+ HashTableEntry& e1 = result.m_hashTable.at(i);
+ const HashTableEntry& e2 = other.m_hashTable.at(i);
+ e1.count -= e2.count;
+ e1.keySum ^= e2.keySum;
+ e1.keyCheck ^= e2.keyCheck;
+ }
+
+ return result;
+}
+
+bool
+operator==(const IBLT& iblt1, const IBLT& iblt2)
+{
+ auto iblt1HashTable = iblt1.getHashTable();
+ auto iblt2HashTable = iblt2.getHashTable();
+ if (iblt1HashTable.size() != iblt2HashTable.size()) {
+ return false;
+ }
+
+ size_t N = iblt1HashTable.size();
+
+ for (size_t i = 0; i < N; i++) {
+ if (iblt1HashTable[i].count != iblt2HashTable[i].count ||
+ iblt1HashTable[i].keySum != iblt2HashTable[i].keySum ||
+ iblt1HashTable[i].keyCheck != iblt2HashTable[i].keyCheck)
+ return false;
+ }
+
+ return true;
+}
+
+bool
+operator!=(const IBLT& iblt1, const IBLT& iblt2)
+{
+ return !(iblt1 == iblt2);
+}
+
+std::ostream&
+operator<<(std::ostream& out, const IBLT& iblt)
+{
+ out << "count keySum keyCheckMatch\n";
+ for (const auto& entry : iblt.getHashTable()) {
+ out << entry.count << " " << entry.keySum << " ";
+ out << ((murmurHash3(N_HASHCHECK, entry.keySum) == entry.keyCheck) ||
+ (entry.isEmpty())? "true" : "false");
+ out << "\n";
+ }
+
+ return out;
+}
+
+void
+IBLT::appendToName(ndn::Name& name) const
+{
+ size_t n = m_hashTable.size();
+ size_t unitSize = (32 * 3) / 8; // hard coding
+ size_t tableSize = unitSize * n;
+
+ std::vector <uint8_t> table(tableSize);
+
+ for (size_t i = 0; i < n; i++) {
+ // table[i*12], table[i*12+1], table[i*12+2], table[i*12+3] --> hashTable[i].count
+
+ table[(i * unitSize)] = 0xFF & m_hashTable[i].count;
+ table[(i * unitSize) + 1] = 0xFF & (m_hashTable[i].count >> 8);
+ table[(i * unitSize) + 2] = 0xFF & (m_hashTable[i].count >> 16);
+ table[(i * unitSize) + 3] = 0xFF & (m_hashTable[i].count >> 24);
+
+ // table[i*12+4], table[i*12+5], table[i*12+6], table[i*12+7] --> hashTable[i].keySum
+
+ table[(i * unitSize) + 4] = 0xFF & m_hashTable[i].keySum;
+ table[(i * unitSize) + 5] = 0xFF & (m_hashTable[i].keySum >> 8);
+ table[(i * unitSize) + 6] = 0xFF & (m_hashTable[i].keySum >> 16);
+ table[(i * unitSize) + 7] = 0xFF & (m_hashTable[i].keySum >> 24);
+
+ // table[i*12+8], table[i*12+9], table[i*12+10], table[i*12+11] --> hashTable[i].keyCheck
+
+ table[(i * unitSize) + 8] = 0xFF & m_hashTable[i].keyCheck;
+ table[(i * unitSize) + 9] = 0xFF & (m_hashTable[i].keyCheck >> 8);
+ table[(i * unitSize) + 10] = 0xFF & (m_hashTable[i].keyCheck >> 16);
+ table[(i * unitSize) + 11] = 0xFF & (m_hashTable[i].keyCheck >> 24);
+ }
+
+ name.append(table.begin(), table.end());
+}
+
+std::vector<uint32_t>
+IBLT::extractValueFromName(const ndn::name::Component& ibltName) const
+{
+ std::vector<uint8_t> ibltValues(ibltName.value_begin(), ibltName.value_end());
+ size_t n = ibltValues.size() / 4;
+
+ std::vector<uint32_t> values(n, 0);
+
+ for (size_t i = 0; i < 4 * n; i += 4) {
+ uint32_t t = (ibltValues[i + 3] << 24) +
+ (ibltValues[i + 2] << 16) +
+ (ibltValues[i + 1] << 8) +
+ ibltValues[i];
+ values[i / 4] = t;
+ }
+
+ return values;
+}
+
+} // namespace psync
diff --git a/src/detail/iblt.hpp b/src/detail/iblt.hpp
new file mode 100644
index 0000000..f03797a
--- /dev/null
+++ b/src/detail/iblt.hpp
@@ -0,0 +1,181 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+
+ * This file incorporates work covered by the following copyright and
+ * permission notice:
+
+ * The MIT License (MIT)
+
+ * Copyright (c) 2014 Gavin Andresen
+
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+*/
+
+#ifndef PSYNC_IBLT_HPP
+#define PSYNC_IBLT_HPP
+
+#include <ndn-cxx/name.hpp>
+
+#include <inttypes.h>
+#include <set>
+#include <vector>
+#include <string>
+
+namespace psync {
+
+class HashTableEntry
+{
+public:
+ int32_t count;
+ uint32_t keySum;
+ uint32_t keyCheck;
+
+ bool
+ isPure() const;
+
+ bool
+ isEmpty() const;
+};
+
+extern const size_t N_HASH;
+extern const size_t N_HASHCHECK;
+
+/**
+ * @brief Invertible Bloom Lookup Table (Invertible Bloom Filter)
+ *
+ * Used by Partial Sync (PartialProducer) and Full Sync (Full Producer)
+ */
+class IBLT
+{
+public:
+ class Error : public std::runtime_error
+ {
+ public:
+ using std::runtime_error::runtime_error;
+ };
+
+ /**
+ * @brief constructor
+ *
+ * @param expectedNumEntries the expected number of entries in the IBLT
+ */
+ explicit IBLT(size_t expectedNumEntries);
+
+ /**
+ * @brief Populate the hash table using the vector representation of IBLT
+ *
+ * @param ibltName the Component representation of IBLT
+ * @throws Error if size of values is not compatible with this IBF
+ */
+ void
+ initialize(const ndn::name::Component& ibltName);
+
+ void
+ insert(uint32_t key);
+
+ void
+ erase(uint32_t key);
+
+ /**
+ * @brief List all the entries in the IBLT
+ *
+ * This is called on a difference of two IBLTs: ownIBLT - rcvdIBLT
+ * Entries listed in positive are in ownIBLT but not in rcvdIBLT
+ * Entries listed in negative are in rcvdIBLT but not in ownIBLT
+ *
+ * @param positive
+ * @param negative
+ * @return true if decoding is complete successfully
+ */
+ bool
+ listEntries(std::set<uint32_t>& positive, std::set<uint32_t>& negative) const;
+
+ IBLT
+ operator-(const IBLT& other) const;
+
+ std::vector<HashTableEntry>
+ getHashTable() const
+ {
+ return m_hashTable;
+ }
+
+ /**
+ * @brief Appends self to name
+ *
+ * Encodes our hash table from uint32_t vector to uint8_t vector
+ * We create a uin8_t vector 12 times the size of uint32_t vector
+ * We put the first count in first 4 cells, keySum in next 4, and keyCheck in next 4.
+ * Repeat for all the other cells of the hash table.
+ * Then we append this uint8_t vector to the name.
+ *
+ * @param name
+ */
+ void
+ appendToName(ndn::Name& name) const;
+
+ /**
+ * @brief Extracts IBLT from name component
+ *
+ * Converts the name into a uint8_t vector which is then decoded to a
+ * a uint32_t vector.
+ *
+ * @param ibltName IBLT represented as a Name Component
+ * @return a uint32_t vector representing the hash table of the IBLT
+ */
+ std::vector<uint32_t>
+ extractValueFromName(const ndn::name::Component& ibltName) const;
+
+private:
+ void
+ update(int plusOrMinus, uint32_t key);
+
+private:
+ std::vector<HashTableEntry> m_hashTable;
+ static const int INSERT = 1;
+ static const int ERASE = -1;
+};
+
+bool
+operator==(const IBLT& iblt1, const IBLT& iblt2);
+
+bool
+operator!=(const IBLT& iblt1, const IBLT& iblt2);
+
+std::ostream&
+operator<<(std::ostream& out, const IBLT& iblt);
+
+} // namespace psync
+
+#endif // PSYNC_IBLT_HPP
\ No newline at end of file
diff --git a/src/detail/state.cpp b/src/detail/state.cpp
new file mode 100644
index 0000000..e5c6c47
--- /dev/null
+++ b/src/detail/state.cpp
@@ -0,0 +1,113 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "state.hpp"
+
+namespace psync {
+
+State::State(const ndn::Block& block)
+{
+ wireDecode(block);
+}
+
+void
+State::addContent(const ndn::Name& prefix)
+{
+ m_content.emplace_back(prefix);
+}
+
+const ndn::Block&
+State::wireEncode() const
+{
+ if (m_wire.hasWire()) {
+ return m_wire;
+ }
+
+ ndn::EncodingEstimator estimator;
+ size_t estimatedSize = wireEncode(estimator);
+
+ ndn::EncodingBuffer buffer(estimatedSize, 0);
+ wireEncode(buffer);
+
+ m_wire = buffer.block();
+ return m_wire;
+}
+
+template<ndn::encoding::Tag TAG>
+size_t
+State::wireEncode(ndn::EncodingImpl<TAG>& block) const
+{
+ size_t totalLength = 0;
+
+ for (std::vector<ndn::Name>::const_reverse_iterator it = m_content.rbegin();
+ it != m_content.rend(); ++it) {
+ totalLength += it->wireEncode(block);
+ }
+
+ totalLength += block.prependVarNumber(totalLength);
+ totalLength += block.prependVarNumber(tlv::PSyncContent);
+
+ return totalLength;
+}
+
+NDN_CXX_DEFINE_WIRE_ENCODE_INSTANTIATIONS(State);
+
+void
+State::wireDecode(const ndn::Block& wire)
+{
+ if (!wire.hasWire()) {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("The supplied block does not contain wire format"));
+ }
+
+ wire.parse();
+ m_wire = wire;
+
+ auto it = m_wire.elements_begin();
+
+ if (it->type() != tlv::PSyncContent) {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type when decoding Content: " +
+ ndn::to_string(wire.type())));
+ }
+
+ it->parse();
+
+ for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
+ if (val->type() == ndn::tlv::Name) {
+ m_content.emplace_back(*val);
+ }
+ else {
+ BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
+ ndn::to_string(m_wire.type())));
+ }
+ }
+}
+
+std::ostream&
+operator<<(std::ostream& os, const State& state)
+{
+ std::vector<ndn::Name> content = state.getContent();
+
+ os << "[";
+ std::copy(content.begin(), content.end(), ndn::make_ostream_joiner(os, ", "));
+ os << "]";
+
+ return os;
+}
+
+} // namespace psync
diff --git a/src/detail/state.hpp b/src/detail/state.hpp
new file mode 100644
index 0000000..bdc6088
--- /dev/null
+++ b/src/detail/state.hpp
@@ -0,0 +1,73 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_STATE_HPP
+#define PSYNC_STATE_HPP
+
+#include <ndn-cxx/name.hpp>
+
+namespace psync {
+
+namespace tlv {
+
+enum {
+ PSyncContent = 128
+};
+
+} // namespace tlv
+
+class State
+{
+public:
+ explicit State(const ndn::Block& block);
+
+ State() = default;
+
+ void
+ addContent(const ndn::Name& prefix);
+
+ std::vector<ndn::Name>
+ getContent() const
+ {
+ return m_content;
+ }
+
+ const ndn::Block&
+ wireEncode() const;
+
+ template<ndn::encoding::Tag TAG>
+ size_t
+ wireEncode(ndn::EncodingImpl<TAG>& block) const;
+
+ void
+ wireDecode(const ndn::Block& wire);
+
+private:
+ std::vector<ndn::Name> m_content;
+ mutable ndn::Block m_wire;
+};
+
+NDN_CXX_DECLARE_WIRE_ENCODE_INSTANTIATIONS(State);
+
+std::ostream&
+operator<<(std::ostream& os, const State& State);
+
+} // namespace psync
+
+#endif // PSYNC_STATE_HPP
\ No newline at end of file
diff --git a/src/detail/test-access-control.hpp b/src/detail/test-access-control.hpp
new file mode 100644
index 0000000..5253a7a
--- /dev/null
+++ b/src/detail/test-access-control.hpp
@@ -0,0 +1,39 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2018, The University of Memphis,
+ * Regents of the University of California
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>
+ *
+ **/
+
+#ifndef PSYNC_TEST_ACCESS_CONTROL_HPP
+#define PSYNC_TEST_ACCESS_CONTROL_HPP
+
+#include "psync-config.hpp"
+
+#ifdef WITH_TESTS
+#define VIRTUAL_WITH_TESTS virtual
+#define PUBLIC_WITH_TESTS_ELSE_PROTECTED public
+#define PUBLIC_WITH_TESTS_ELSE_PRIVATE public
+#define PROTECTED_WITH_TESTS_ELSE_PRIVATE protected
+#else
+#define VIRTUAL_WITH_TESTS
+#define PUBLIC_WITH_TESTS_ELSE_PROTECTED protected
+#define PUBLIC_WITH_TESTS_ELSE_PRIVATE private
+#define PROTECTED_WITH_TESTS_ELSE_PRIVATE private
+#endif
+
+#endif // PSYNC_TEST_ACCESS_CONTROL_HPP
diff --git a/src/detail/util.cpp b/src/detail/util.cpp
new file mode 100644
index 0000000..ab2c541
--- /dev/null
+++ b/src/detail/util.cpp
@@ -0,0 +1,109 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * murmurHash3 was written by Austin Appleby, and is placed in the public
+ * domain. The author hereby disclaims copyright to this source code.
+ * https://github.com/aappleby/smhasher/blob/master/src/murmurHash3.cpp
+ **/
+
+#include "util.hpp"
+
+#include <ndn-cxx/util/backports.hpp>
+
+#include <string>
+
+namespace psync {
+
+static uint32_t
+ROTL32 ( uint32_t x, int8_t r )
+{
+ return (x << r) | (x >> (32 - r));
+}
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, const std::vector<unsigned char>& vDataToHash)
+{
+ uint32_t h1 = nHashSeed;
+ const uint32_t c1 = 0xcc9e2d51;
+ const uint32_t c2 = 0x1b873593;
+
+ const size_t nblocks = vDataToHash.size() / 4;
+
+ //----------
+ // body
+ const uint32_t * blocks = (const uint32_t *)(&vDataToHash[0] + nblocks*4);
+
+ for (size_t i = -nblocks; i; i++) {
+ uint32_t k1 = blocks[i];
+
+ k1 *= c1;
+ k1 = ROTL32(k1,15);
+ k1 *= c2;
+
+ h1 ^= k1;
+ h1 = ROTL32(h1,13);
+ h1 = h1*5+0xe6546b64;
+ }
+
+ //----------
+ // tail
+ const uint8_t * tail = (const uint8_t*)(&vDataToHash[0] + nblocks*4);
+
+ uint32_t k1 = 0;
+
+ switch (vDataToHash.size() & 3) {
+ case 3:
+ k1 ^= tail[2] << 16;
+ NDN_CXX_FALLTHROUGH;
+
+ case 2:
+ k1 ^= tail[1] << 8;
+ NDN_CXX_FALLTHROUGH;
+
+ case 1:
+ k1 ^= tail[0];
+ k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
+ }
+
+ //----------
+ // finalization
+ h1 ^= vDataToHash.size();
+ h1 ^= h1 >> 16;
+ h1 *= 0x85ebca6b;
+ h1 ^= h1 >> 13;
+ h1 *= 0xc2b2ae35;
+ h1 ^= h1 >> 16;
+
+ return h1;
+}
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, const std::string& str)
+{
+ return murmurHash3(nHashSeed, std::vector<unsigned char>(str.begin(), str.end()));
+}
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, uint32_t value)
+{
+ return murmurHash3(nHashSeed,
+ std::vector<unsigned char>((unsigned char*)&value,
+ (unsigned char*)&value + sizeof(uint32_t)));
+}
+
+} // namespace psync
diff --git a/src/detail/util.hpp b/src/detail/util.hpp
new file mode 100644
index 0000000..980b6ed
--- /dev/null
+++ b/src/detail/util.hpp
@@ -0,0 +1,53 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * murmurHash3 was written by Austin Appleby, and is placed in the public
+ * domain. The author hereby disclaims copyright to this source code.
+ * https://github.com/aappleby/smhasher/blob/master/src/murmurHash3.cpp
+ **/
+
+#ifndef PSYNC_UTIL_HPP
+#define PSYNC_UTIL_HPP
+
+#include <ndn-cxx/name.hpp>
+
+#include <inttypes.h>
+#include <vector>
+#include <string>
+
+namespace psync {
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, const std::vector<unsigned char>& vDataToHash);
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, const std::string& str);
+
+uint32_t
+murmurHash3(uint32_t nHashSeed, uint32_t value);
+
+struct MissingDataInfo
+{
+ ndn::Name prefix;
+ uint64_t lowSeq;
+ uint64_t highSeq;
+};
+
+} // namespace psync
+
+#endif // PSYNC_UTIL_HPP
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
new file mode 100644
index 0000000..dd4d452
--- /dev/null
+++ b/src/full-producer.cpp
@@ -0,0 +1,466 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of NLSR (Named-data Link State Routing).
+ * See AUTHORS.md for complete list of NLSR authors and contributors.
+ *
+ * NLSR is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "full-producer.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
+
+#include <iostream>
+#include <cstring>
+#include <limits>
+#include <functional>
+
+namespace psync {
+
+NDN_LOG_INIT(psync.FullProducer);
+
+FullProducer::FullProducer(const size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ const UpdateCallback& onUpdateCallBack,
+ ndn::time::milliseconds syncInterestLifetime,
+ ndn::time::milliseconds syncReplyFreshness)
+ : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
+ , m_syncInterestLifetime(syncInterestLifetime)
+ , m_onUpdate(onUpdateCallBack)
+ , m_outstandingInterestId(nullptr)
+ , m_scheduledSyncInterestId(m_scheduler)
+{
+ int jitter = m_syncInterestLifetime.count() * .20;
+ m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
+
+ m_registerPrefixId =
+ m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
+ std::bind(&FullProducer::onInterest, this, _1, _2),
+ std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
+
+ // Should we do this after setInterestFilter success call back
+ // (Currently following ChronoSync's way)
+ sendSyncInterest();
+}
+
+FullProducer::~FullProducer()
+{
+ if (m_outstandingInterestId != nullptr) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ }
+
+ m_face.unsetInterestFilter(m_registerPrefixId);
+}
+
+void
+FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
+{
+ if (m_prefixes.find(prefix) == m_prefixes.end()) {
+ NDN_LOG_WARN("Prefix not added: " << prefix);
+ return;
+ }
+
+ uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
+
+ NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
+
+ updateSeqNo(prefix, newSeq);
+
+ satisfyPendingInterests();
+}
+
+void
+FullProducer::sendSyncInterest()
+{
+ // If we send two sync interest one after the other
+ // since there is no new data in the network yet,
+ // when data is available it may satisfy both of them
+ if (m_outstandingInterestId != nullptr) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ }
+
+ // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
+ ndn::Name syncInterestName = m_syncPrefix;
+
+ // Append our latest IBF
+ m_iblt.appendToName(syncInterestName);
+
+ m_outstandingInterestName = syncInterestName;
+
+ m_scheduledSyncInterestId =
+ m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
+ [this] { sendSyncInterest(); });
+
+ ndn::Interest syncInterest(syncInterestName);
+ syncInterest.setInterestLifetime(m_syncInterestLifetime);
+ // Other side appends hash of IBF to sync data name
+ syncInterest.setCanBePrefix(true);
+ syncInterest.setMustBeFresh(true);
+
+ syncInterest.setNonce(1);
+ syncInterest.refreshNonce();
+
+ m_outstandingInterestId = m_face.expressInterest(syncInterest,
+ std::bind(&FullProducer::onSyncData, this, _1, _2),
+ [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
+ NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
+ " for Interest with Nonce: " << interest.getNonce());
+ },
+ [] (const ndn::Interest& interest) {
+ NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
+ });
+
+ NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
+ ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
+}
+
+void
+FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
+{
+ ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
+ if (nameWithoutSyncPrefix.size() == 2 &&
+ nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
+ onRecoveryInterest(interest);
+ }
+ else if (nameWithoutSyncPrefix.size() == 1) {
+ onSyncInterest(interest);
+ }
+}
+
+void
+FullProducer::onSyncInterest(const ndn::Interest& interest)
+{
+ ndn::Name interestName = interest.getName();
+ ndn::name::Component ibltName = interestName.get(interestName.size()-1);
+
+ NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
+ ", hash: " << std::hash<ndn::Name>{}(interestName));
+
+ IBLT iblt(m_expectedNumEntries);
+ try {
+ iblt.initialize(ibltName);
+ }
+ catch (const std::exception& e) {
+ NDN_LOG_WARN(e.what());
+ return;
+ }
+
+ IBLT diff = m_iblt - iblt;
+
+ std::set<uint32_t> positive;
+ std::set<uint32_t> negative;
+
+ if (!diff.listEntries(positive, negative)) {
+ NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
+ << " negative: " << negative.size() << " m_threshold: "
+ << m_threshold);
+
+ // Send nack if greater then threshold, else send positive below as usual
+ // Or send if we can't get neither positive nor negative differences
+ if (positive.size() + negative.size() >= m_threshold ||
+ (positive.size() == 0 && negative.size() == 0)) {
+
+ // If we don't have anything to offer means that
+ // we are behind and should not mislead other nodes.
+ bool haveSomethingToOffer = false;
+ for (const auto& content : m_prefixes) {
+ if (content.second != 0) {
+ haveSomethingToOffer = true;
+ }
+ }
+
+ if (haveSomethingToOffer) {
+ sendApplicationNack(interestName);
+ }
+ return;
+ }
+ }
+
+ State state;
+ for (const auto& hash : positive) {
+ ndn::Name prefix = m_hash2prefix[hash];
+ // Don't sync up sequence number zero
+ if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
+ state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+ }
+ }
+
+ if (!state.getContent().empty()) {
+ NDN_LOG_DEBUG("Sending sync content: " << state);
+ sendSyncData(interestName, state.wireEncode());
+ return;
+ }
+
+ ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
+ auto it = m_pendingEntries.emplace(interestName,
+ PendingEntryInfoFull{iblt, std::move(scopedEventId)});
+
+ it.first->second.expirationEvent =
+ m_scheduler.scheduleEvent(interest.getInterestLifetime(),
+ [this, interest] {
+ NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
+ m_pendingEntries.erase(interest.getName());
+ });
+}
+
+void
+FullProducer::onRecoveryInterest(const ndn::Interest& interest)
+{
+ NDN_LOG_DEBUG("Recovery interest received");
+
+ State state;
+ for (const auto& content : m_prefixes) {
+ if (content.second != 0) {
+ state.addContent(ndn::Name(content.first).appendNumber(content.second));
+ }
+ }
+
+ // Send even if state is empty to let other side know that we are behind
+ sendRecoveryData(interest.getName(), state);
+}
+
+void
+FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
+{
+ NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
+
+ ndn::Name nameWithIblt;
+ m_iblt.appendToName(nameWithIblt);
+
+ // Append hash of our IBF so that data name maybe different for each node answering
+ ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
+ data.setFreshnessPeriod(m_syncReplyFreshness);
+ data.setContent(block);
+ m_keyChain.sign(data);
+
+ // checking if our own interest got satisfied
+ if (m_outstandingInterestName == name) {
+ NDN_LOG_DEBUG("Satisfied our own pending interest");
+ // remove outstanding interest
+ if (m_outstandingInterestId != nullptr) {
+ NDN_LOG_DEBUG("Removing our pending interest from face");
+ m_face.removePendingInterest(m_outstandingInterestId);
+ m_outstandingInterestId = nullptr;
+ m_outstandingInterestName = ndn::Name("");
+ }
+
+ NDN_LOG_DEBUG("Sending Sync Data");
+
+ // Send data after removing pending sync interest on face
+ m_face.put(data);
+
+ NDN_LOG_TRACE("Renewing sync interest");
+ sendSyncInterest();
+ }
+ else {
+ NDN_LOG_DEBUG("Sending Sync Data");
+ m_face.put(data);
+ }
+}
+
+void
+FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+{
+ ndn::Name interestName = interest.getName();
+ deletePendingInterests(interest.getName());
+
+ if (data.getContentType() == ndn::tlv::ContentType_Nack) {
+ NDN_LOG_DEBUG("Got application nack, sending recovery interest");
+ sendRecoveryInterest(interest);
+ return;
+ }
+
+ State state(data.getContent());
+ std::vector<MissingDataInfo> updates;
+
+ if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
+ state.getContent().empty()) {
+ NDN_LOG_TRACE("Recovery data is empty, other side is behind");
+ return;
+ }
+
+ NDN_LOG_DEBUG("Sync Data Received: " << state);
+
+ for (const auto& content : state.getContent()) {
+ ndn::Name prefix = content.getPrefix(-1);
+ uint64_t seq = content.get(content.size()-1).toNumber();
+
+ if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
+ updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
+ updateSeqNo(prefix, seq);
+ // We should not call satisfyPendingSyncInterests here because we just
+ // got data and deleted pending interest by calling deletePendingFullSyncInterests
+ // But we might have interests not matching to this interest that might not have deleted
+ // from pending sync interest
+ }
+ }
+
+ // We just got the data, so send a new sync interest
+ if (!updates.empty()) {
+ m_onUpdate(updates);
+ NDN_LOG_TRACE("Renewing sync interest");
+ sendSyncInterest();
+ }
+ else {
+ NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
+ " , hash: " << std::hash<ndn::Name>{}(interestName));
+ }
+}
+
+void
+FullProducer::satisfyPendingInterests()
+{
+ NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
+
+ for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+ const PendingEntryInfoFull& entry = it->second;
+ IBLT diff = m_iblt - entry.iblt;
+ std::set<uint32_t> positive;
+ std::set<uint32_t> negative;
+
+ if (!diff.listEntries(positive, negative)) {
+ NDN_LOG_TRACE("Decode failed for pending interest");
+ if (positive.size() + negative.size() >= m_threshold ||
+ (positive.size() == 0 && negative.size() == 0)) {
+ NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
+ m_pendingEntries.erase(it++);
+ continue;
+ }
+ }
+
+ State state;
+ for (const auto& hash : positive) {
+ ndn::Name prefix = m_hash2prefix[hash];
+
+ if (m_prefixes[prefix] != 0) {
+ state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+ }
+ }
+
+ if (!state.getContent().empty()) {
+ NDN_LOG_DEBUG("Satisfying sync content: " << state);
+ sendSyncData(it->first, state.wireEncode());
+ m_pendingEntries.erase(it++);
+ }
+ else {
+ ++it;
+ }
+ }
+}
+
+bool
+FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
+{
+ uint32_t nextHash = murmurHash3(N_HASHCHECK,
+ ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
+ for (const auto& nHash : negative) {
+ if (nHash == nextHash) {
+ return true;
+ break;
+ }
+ }
+ return false;
+}
+
+void
+FullProducer::deletePendingInterests(const ndn::Name& interestName) {
+ for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+ if (it->first == interestName) {
+ NDN_LOG_TRACE("Delete pending interest: " << interestName);
+ m_pendingEntries.erase(it++);
+ }
+ else {
+ ++it;
+ }
+ }
+}
+
+void
+FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
+{
+ ndn::EncodingBuffer buffer;
+ buffer.prependBlock(state.wireEncode());
+
+ const uint8_t* rawBuffer = buffer.buf();
+ const uint8_t* segmentBegin = rawBuffer;
+ const uint8_t* end = rawBuffer + buffer.size();
+
+ uint64_t segmentNo = 0;
+ do {
+ const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
+ if (segmentEnd > end) {
+ segmentEnd = end;
+ }
+
+ ndn::Name segmentName(prefix);
+ segmentName.appendSegment(segmentNo);
+
+ std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+ data->setContent(segmentBegin, segmentEnd - segmentBegin);
+ data->setFreshnessPeriod(m_syncReplyFreshness);
+
+ segmentBegin = segmentEnd;
+ if (segmentBegin >= end) {
+ data->setFinalBlock(segmentName[-1]);
+ }
+
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
+
+ ++segmentNo;
+ } while (segmentBegin < end);
+}
+
+void
+FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
+{
+ if (m_outstandingInterestId != nullptr) {
+ m_face.removePendingInterest(m_outstandingInterestId);
+ m_outstandingInterestId = nullptr;
+ }
+
+ ndn::Name ibltName;
+ m_iblt.appendToName(ibltName);
+
+ ndn::Name recoveryInterestName(m_syncPrefix);
+ recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
+ recoveryInterestName.append(RECOVERY_PREFIX);
+
+ ndn::Interest recoveryInterest(recoveryInterestName);
+ recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
+
+ auto fetcher = ndn::util::SegmentFetcher::start(m_face,
+ recoveryInterest,
+ ndn::security::v2::getAcceptAllValidator());
+
+ fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
+ NDN_LOG_TRACE("Segment fetcher got data");
+ ndn::Data data;
+ data.setContent(std::move(bufferPtr));
+ onSyncData(recoveryInterest, data);
+ });
+
+ fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
+ NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
+ " message: " << msg);
+ });
+}
+
+} // namespace psync
\ No newline at end of file
diff --git a/src/full-producer.hpp b/src/full-producer.hpp
new file mode 100644
index 0000000..2f97522
--- /dev/null
+++ b/src/full-producer.hpp
@@ -0,0 +1,243 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of NLSR (Named-data Link State Routing).
+ * See AUTHORS.md for complete list of NLSR authors and contributors.
+ *
+ * NLSR is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_FULL_PRODUCER_HPP
+#define PSYNC_FULL_PRODUCER_HPP
+
+#include "producer-base.hpp"
+#include "detail/state.hpp"
+
+#include <map>
+#include <unordered_set>
+#include <random>
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+namespace psync {
+
+// Name has to be different than PendingEntryInfo
+// used in partial-producer otherwise get strange segmentation-faults
+// when partial producer is destructed
+struct PendingEntryInfoFull
+{
+ IBLT iblt;
+ ndn::util::scheduler::ScopedEventId expirationEvent;
+};
+
+typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
+
+const ndn::time::milliseconds SYNC_INTEREST_LIFTIME = 1_s;
+const ndn::Name RECOVERY_PREFIX("recovery");
+
+/**
+ * @brief Full sync logic to synchronize with other nodes
+ * where all nodes wants to get all names prefixes synced.
+ *
+ * Application should call publishName whenever it wants to
+ * let consumers know that new data is available for the userPrefix.
+ * Multiple userPrefixes can be added by using addUserNode.
+ * Currently, fetching and publishing of data needs to be handled by the application.
+ */
+class FullProducer : public ProducerBase
+{
+public:
+ /**
+ * @brief constructor
+ *
+ * Registers syncPrefix in NFD and sends a sync interest
+ *
+ * @param expectedNumEntries expected entries in IBF
+ * @param face application's face
+ * @param syncPrefix The prefix of the sync group
+ * @param userPrefix The prefix of the first user in the group
+ * @param onUpdateCallBack The call back to be called when there is new data
+ * @param syncInterestLifetime lifetime of the sync interest
+ * @param syncReplyFreshness freshness of sync data
+ */
+ FullProducer(size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ const UpdateCallback& onUpdateCallBack,
+ ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFTIME,
+ ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS);
+
+ ~FullProducer();
+
+ /**
+ * @brief Publish name to let others know
+ *
+ * addUserNode needs to be called before this to add the prefix
+ * if not already added via the constructor.
+ * If seq is null then the seq of prefix is incremented by 1 else
+ * the supplied sequence is set in the IBF.
+ *
+ * @param prefix the prefix to be updated
+ * @param seq the sequence number of the prefix
+ */
+ void
+ publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq = ndn::nullopt);
+
+private:
+ /**
+ * @brief Send sync interest for full synchronization
+ *
+ * Forms the interest name: /<sync-prefix>/<own-IBF>
+ * Cancels any pending sync interest we sent earlier on the face
+ * Sends the sync interest
+ */
+ void
+ sendSyncInterest();
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ /**
+ * @brief Process interest from other parties
+ *
+ * Determine whether this is a sync interest or recovery interest
+ * and dispatch to onSyncInterest or onRecoveryInterest respectively.
+ *
+ * @param prefixName prefix for sync group which we registered
+ * @param interest the interest we got
+ */
+ void
+ onInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
+
+private:
+ /**
+ * @brief Process sync interest from other parties
+ *
+ * Get differences b/w our IBF and IBF in the sync interest.
+ * If we cannot get the differences successfully then send an application nack.
+ *
+ * If we have some things in our IBF that the other side does not have, reply with the content or
+ * If no. of different items is greater than threshold or equals zero then send a nack.
+ * Otherwise add the sync interest into a map with interest name as key and PendingEntryInfoFull
+ * as value.
+ *
+ * @param interest the sync interest we got
+ */
+ void
+ onSyncInterest(const ndn::Interest& interest);
+
+ /**
+ * @brief Publish our entire state so that requester can catch up.
+ *
+ * @param interest the recovery interest we got
+ */
+ void
+ onRecoveryInterest(const ndn::Interest& interest);
+
+ /**
+ * @brief Send sync data
+ *
+ * Check if the data will satisfy our own pending interest,
+ * remove it first if it does, and then renew the sync interest
+ * Otherwise just send the data
+ *
+ * @param name name to be set as data name
+ * @param block the content of the data
+ */
+ void
+ sendSyncData(const ndn::Name& name, const ndn::Block& block);
+
+ /**
+ * @brief Process sync data
+ *
+ * Call deletePendingInterests to delete any pending sync interest with
+ * interest name would have been satisfied once NFD got the data.
+ *
+ * For each prefix/seq in data content check that we don't already have the
+ * prefix/seq and updateSeq(prefix, seq)
+ *
+ * Notify the application about the updates
+ * sendSyncInterest because the last one was satisfied by the incoming data
+ *
+ * @param interest interest for which we got the data
+ * @param data the data packet we got
+ */
+ void
+ onSyncData(const ndn::Interest& interest, const ndn::Data& data);
+
+ /**
+ * @brief Satisfy pending sync interests
+ *
+ * For pending sync interests SI, if IBF of SI has any difference from our own IBF:
+ * send data back.
+ * If we can't decode differences from the stored IBF, then delete it.
+ */
+ void
+ satisfyPendingInterests();
+
+ /**
+ * @brief Delete pending sync interests that match given name
+ *
+ */
+ void
+ deletePendingInterests(const ndn::Name& interestName);
+
+ /**
+ * @brief Check if hash(prefix + 1) is in negative
+ *
+ * Sometimes what happens is that interest from other side
+ * gets to us before the data
+ */
+ bool
+ isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
+
+ /**
+ * @brief Segment and send state with the given data name
+ *
+ */
+ void
+ sendRecoveryData(const ndn::Name& prefix, const State& state);
+
+ /**
+ * @brief Send recovery interest using segment fetcher
+ *
+ * Recovery data is expected go over max packet size
+ * Appends the RECOVERY_PREFIX to the given interest
+ */
+ void
+ sendRecoveryInterest(const ndn::Interest& interest);
+
+private:
+ std::map <ndn::Name, PendingEntryInfoFull> m_pendingEntries;
+
+ ndn::time::milliseconds m_syncInterestLifetime;
+
+ UpdateCallback m_onUpdate;
+
+ const ndn::PendingInterestId* m_outstandingInterestId;
+
+ ndn::util::scheduler::ScopedEventId m_scheduledSyncInterestId;
+
+ std::uniform_int_distribution<> m_jitter;
+
+ ndn::Name m_outstandingInterestName;
+
+ const ndn::RegisteredPrefixId* m_registerPrefixId;
+};
+
+} // namespace psync
+
+#endif // PSYNC_FULL_PRODUCER_HPP
\ No newline at end of file
diff --git a/src/partial-producer.cpp b/src/partial-producer.cpp
new file mode 100644
index 0000000..73fb677
--- /dev/null
+++ b/src/partial-producer.cpp
@@ -0,0 +1,252 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "partial-producer.hpp"
+#include "detail/state.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+
+#include <iostream>
+#include <cstring>
+#include <limits>
+
+namespace psync {
+
+NDN_LOG_INIT(psync.PartialProducer);
+
+PartialProducer::PartialProducer(size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds syncReplyFreshness,
+ ndn::time::milliseconds helloReplyFreshness)
+ : ProducerBase(expectedNumEntries, face, syncPrefix,
+ userPrefix, syncReplyFreshness, helloReplyFreshness)
+{
+ m_registerPrefixId =
+ m_face.registerPrefix(m_syncPrefix,
+ [this] (const ndn::Name& syncPrefix) {
+ m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
+ std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
+
+ m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
+ std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
+ },
+ std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
+}
+
+PartialProducer::~PartialProducer()
+{
+ m_face.unregisterPrefix(m_registerPrefixId, nullptr, nullptr);
+}
+
+void
+PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
+{
+ if (m_prefixes.find(prefix) == m_prefixes.end()) {
+ return;
+ }
+
+ uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
+
+ NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
+
+ updateSeqNo(prefix, newSeq);
+
+ satisfyPendingSyncInterests(prefix);
+}
+
+void
+PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
+{
+ NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest.getNonce());
+
+ State state;
+
+ for (const auto& p : m_prefixes) {
+ state.addContent(p.first);
+ }
+ NDN_LOG_DEBUG("sending content p: " << state);
+
+ ndn::Name helloDataName = prefix;
+ m_iblt.appendToName(helloDataName);
+
+ ndn::Data data;
+ data.setName(helloDataName);
+ data.setFreshnessPeriod(m_helloReplyFreshness);
+ data.setContent(state.wireEncode());
+
+ m_keyChain.sign(data);
+ m_face.put(data);
+}
+
+void
+PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
+{
+ NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
+ " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
+
+ ndn::Name interestName = interest.getName();
+
+ ndn::name::Component bfName, ibltName;
+ unsigned int projectedCount;
+ double falsePositiveProb;
+ try {
+ projectedCount = interestName.get(interestName.size()-4).toNumber();
+ falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
+ bfName = interestName.get(interestName.size()-2);
+
+ ibltName = interestName.get(interestName.size()-1);
+ }
+ catch (const std::exception& e) {
+ NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
+ NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
+ return;
+ }
+
+ BloomFilter bf;
+ IBLT iblt(m_expectedNumEntries);
+
+ try {
+ bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
+ iblt.initialize(ibltName);
+ }
+ catch (const std::exception& e) {
+ NDN_LOG_WARN(e.what());
+ return;
+ }
+
+ // get the difference
+ IBLT diff = m_iblt - iblt;
+
+ // non-empty positive means we have some elements that the others don't
+ std::set<uint32_t> positive;
+ std::set<uint32_t> negative;
+
+ NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
+
+ bool peel = diff.listEntries(positive, negative);
+
+ NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
+
+ if (!peel) {
+ NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
+ sendApplicationNack(interestName);
+ return;
+ }
+
+ // generate content for Sync reply
+ State state;
+ NDN_LOG_TRACE("Size of positive set " << positive.size());
+ NDN_LOG_TRACE("Size of negative set " << negative.size());
+ for (const auto& hash : positive) {
+ ndn::Name prefix = m_hash2prefix[hash];
+ if (bf.contains(prefix.toUri())) {
+ // generate data
+ state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+ NDN_LOG_DEBUG("Content: " << prefix << " " << std::to_string(m_prefixes[prefix]));
+ }
+ }
+
+ NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
+
+ if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
+
+ // send back data
+ ndn::Name syncDataName = interestName;
+ m_iblt.appendToName(syncDataName);
+ ndn::Data data;
+ data.setName(syncDataName);
+ data.setFreshnessPeriod(m_syncReplyFreshness);
+ data.setContent(state.wireEncode());
+
+ m_keyChain.sign(data);
+ NDN_LOG_DEBUG("Sending sync data");
+ m_face.put(data);
+
+ return;
+ }
+
+ ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
+ auto it = m_pendingEntries.emplace(interestName,
+ PendingEntryInfo{bf, iblt, std::move(scopedEventId)});
+
+ it.first->second.expirationEvent =
+ m_scheduler.scheduleEvent(interest.getInterestLifetime(),
+ [this, interest] {
+ NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
+ m_pendingEntries.erase(interest.getName());
+ });
+}
+
+void
+PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
+ NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
+
+ for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
+ const PendingEntryInfo& entry = it->second;
+
+ IBLT diff = m_iblt - entry.iblt;
+ std::set<uint32_t> positive;
+ std::set<uint32_t> negative;
+
+ bool peel = diff.listEntries(positive, negative);
+
+ NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
+
+ NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
+ NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
+
+ if (!peel) {
+ NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
+ m_pendingEntries.erase(it++);
+ continue;
+ }
+
+ State state;
+ if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
+ if (entry.bf.contains(prefix.toUri())) {
+ state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+ NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
+ }
+ else {
+ NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
+ }
+
+ // generate sync data and cancel the event
+ ndn::Name syncDataName = it->first;
+ m_iblt.appendToName(syncDataName);
+ ndn::Data data;
+ data.setName(syncDataName);
+ data.setFreshnessPeriod(m_syncReplyFreshness);
+ data.setContent(state.wireEncode());
+
+ m_keyChain.sign(data);
+ NDN_LOG_DEBUG("Sending sync data");
+ m_face.put(data);
+
+ m_pendingEntries.erase(it++);
+ }
+ else {
+ ++it;
+ }
+ }
+}
+
+} // namespace psync
diff --git a/src/partial-producer.hpp b/src/partial-producer.hpp
new file mode 100644
index 0000000..a2db7ac
--- /dev/null
+++ b/src/partial-producer.hpp
@@ -0,0 +1,128 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_PARTIAL_PRODUCER_HPP
+#define PSYNC_PARTIAL_PRODUCER_HPP
+
+#include "detail/bloom-filter.hpp"
+#include "producer-base.hpp"
+
+#include <map>
+#include <unordered_set>
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+namespace psync {
+
+struct PendingEntryInfo
+{
+ BloomFilter bf;
+ IBLT iblt;
+ ndn::util::scheduler::ScopedEventId expirationEvent;
+};
+
+/**
+ * @brief Partial sync logic to publish data names
+ *
+ * Application should call publishName whenever it wants to
+ * let consumers know that new data is available.
+ * Additional userPrefix should be added via addUserNode before calling publishName
+ * Currently, publishing of data needs to be handled by the application.
+ */
+class PartialProducer : public ProducerBase
+{
+public:
+ /**
+ * @brief constructor
+ *
+ * Registers syncPrefix in NFD and sets internal filters for
+ * "sync" and "hello" under syncPrefix
+ *
+ * @param expectedNumEntries expected entries in IBF
+ * @param face application's face
+ * @param syncPrefix The prefix of the sync group
+ * @param userPrefix The prefix of the first user in the group
+ * @param syncReplyFreshness freshness of sync data
+ * @param helloReplyFreshness freshness of hello data
+ */
+ PartialProducer(size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds helloReplyFreshness = HELLO_REPLY_FRESHNESS,
+ ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS);
+
+ ~PartialProducer();
+
+ /**
+ * @brief Publish name to let subscribed consumers know
+ *
+ * If seq is null then the seq of prefix is incremented by 1 else
+ * the supplied sequence is set in the IBF.
+ * Upon updating the sequence in the IBF satisfyPendingSyncInterests
+ * is called to let subscribed consumers know.
+ *
+ * @param prefix the prefix to be updated
+ * @param seq the sequence number of the prefix
+ */
+ void
+ publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq = ndn::nullopt);
+
+private:
+ /**
+ * @brief Satisfy any pending interest that have subscription for prefix
+ *
+ * @param prefix the prefix that was updated in publishName
+ */
+ void
+ satisfyPendingSyncInterests(const ndn::Name& prefix);
+
+ /**
+ * @brief Receive hello interest from consumer and respond with hello data
+ *
+ * Hello data's name format is: /\<sync-prefix\>/hello/\<current-IBF\>
+ */
+ void
+ onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest);
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ /**
+ * @brief Receive sync interest from consumer
+ *
+ * Either respond with sync data if consumer is behind or
+ * store sync interest in m_pendingEntries
+ *
+ * Sync data's name format is: /\<sync-prefix\>/sync/\<old-IBF\>/\<current-IBF\>
+ */
+ void
+ onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest);
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ std::map <ndn::Name, PendingEntryInfo> m_pendingEntries;
+
+ const ndn::RegisteredPrefixId* m_registerPrefixId;
+};
+
+} // namespace psync
+
+#endif // PSYNC_PARTIAL_PRODUCER_HPP
diff --git a/src/producer-base.cpp b/src/producer-base.cpp
new file mode 100644
index 0000000..e24fb5b
--- /dev/null
+++ b/src/producer-base.cpp
@@ -0,0 +1,146 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "producer-base.hpp"
+
+#include <ndn-cxx/util/logger.hpp>
+#include <boost/algorithm/string.hpp>
+
+#include <cstring>
+#include <limits>
+#include <functional>
+
+namespace psync {
+
+NDN_LOG_INIT(psync.ProducerBase);
+
+ProducerBase::ProducerBase(size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds syncReplyFreshness,
+ ndn::time::milliseconds helloReplyFreshness)
+ : m_iblt(expectedNumEntries)
+ , m_expectedNumEntries(expectedNumEntries)
+ , m_threshold(expectedNumEntries/2)
+ , m_face(face)
+ , m_scheduler(m_face.getIoService())
+ , m_syncPrefix(syncPrefix)
+ , m_userPrefix(userPrefix)
+ , m_syncReplyFreshness(syncReplyFreshness)
+ , m_helloReplyFreshness(helloReplyFreshness)
+{
+ addUserNode(userPrefix);
+}
+
+bool
+ProducerBase::addUserNode(const ndn::Name& prefix)
+{
+ if (m_prefixes.find(prefix) == m_prefixes.end()) {
+ m_prefixes[prefix] = 0;
+ return true;
+ }
+ else {
+ return false;
+ }
+}
+
+void
+ProducerBase::removeUserNode(const ndn::Name& prefix)
+{
+ auto it = m_prefixes.find(prefix);
+ if (it != m_prefixes.end()) {
+ uint64_t seqNo = it->second;
+ m_prefixes.erase(it);
+
+ ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seqNo);
+ auto hashIt = m_prefix2hash.find(prefixWithSeq);
+ if (hashIt != m_prefix2hash.end()) {
+ uint32_t hash = hashIt->second;
+ m_prefix2hash.erase(hashIt);
+ m_hash2prefix.erase(hash);
+ m_iblt.erase(hash);
+ }
+ }
+}
+
+void
+ProducerBase::updateSeqNo(const ndn::Name& prefix, uint64_t seq)
+{
+ NDN_LOG_DEBUG("UpdateSeq: " << prefix << " " << seq);
+
+ uint64_t oldSeq;
+ auto it = m_prefixes.find(prefix);
+ if (it != m_prefixes.end()) {
+ oldSeq = it->second;
+ }
+ else {
+ NDN_LOG_WARN("Prefix not found in m_prefixes");
+ return;
+ }
+
+ if (oldSeq >= seq) {
+ NDN_LOG_WARN("Update has lower/equal seq no for prefix, doing nothing!");
+ return;
+ }
+
+ // Delete the last sequence prefix from the iblt
+ // Because we don't insert zeroth prefix in IBF so no need to delete that
+ if (oldSeq != 0) {
+ ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(oldSeq);
+ auto hashIt = m_prefix2hash.find(prefixWithSeq);
+ if (hashIt != m_prefix2hash.end()) {
+ uint32_t hash = hashIt->second;
+ m_prefix2hash.erase(hashIt);
+ m_hash2prefix.erase(hash);
+ m_iblt.erase(hash);
+ }
+ }
+
+ // Insert the new seq no
+ it->second = seq;
+ ndn::Name prefixWithSeq = ndn::Name(prefix).appendNumber(seq);
+ uint32_t newHash = murmurHash3(N_HASHCHECK, prefixWithSeq.toUri());
+ m_prefix2hash[prefixWithSeq] = newHash;
+ m_hash2prefix[newHash] = prefix;
+ m_iblt.insert(newHash);
+}
+
+void
+ProducerBase::sendApplicationNack(const ndn::Name& name)
+{
+ NDN_LOG_DEBUG("Sending application nack");
+ ndn::Name dataName(name);
+ m_iblt.appendToName(dataName);
+
+ ndn::Data data(dataName);
+ data.setFreshnessPeriod(m_syncReplyFreshness);
+ data.setContentType(ndn::tlv::ContentType_Nack);
+ m_keyChain.sign(data);
+ m_face.put(data);
+}
+
+void
+ProducerBase::onRegisterFailed(const ndn::Name& prefix, const std::string& msg) const
+{
+ NDN_LOG_ERROR("ProduerBase::onRegisterFailed " << prefix << " " << msg);
+ BOOST_THROW_EXCEPTION(Error(msg));
+}
+
+} // namespace psync
diff --git a/src/producer-base.hpp b/src/producer-base.hpp
new file mode 100644
index 0000000..079b803
--- /dev/null
+++ b/src/producer-base.hpp
@@ -0,0 +1,188 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_PRODUCER_BASE_HPP
+#define PSYNC_PRODUCER_BASE_HPP
+
+#include "detail/iblt.hpp"
+#include "detail/bloom-filter.hpp"
+#include "detail/util.hpp"
+#include "detail/test-access-control.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/security/validator-config.hpp>
+
+#include <map>
+#include <unordered_set>
+#include <random>
+
+namespace psync {
+
+using namespace ndn::literals::time_literals;
+
+const ndn::time::milliseconds SYNC_REPLY_FRESHNESS = 1_s;
+const ndn::time::milliseconds HELLO_REPLY_FRESHNESS = 1_s;
+
+/**
+ * @brief Base class for PartialProducer and FullProducer
+ *
+ * Contains code common to both
+ */
+class ProducerBase
+{
+ class Error : public std::runtime_error
+ {
+ public:
+ using std::runtime_error::runtime_error;
+ };
+
+PUBLIC_WITH_TESTS_ELSE_PROTECTED:
+ /**
+ * @brief constructor
+ *
+ * @param expectedNumEntries expected number entries in IBF
+ * @param face application's face
+ * @param syncPrefix The prefix of the sync group
+ * @param userPrefix The prefix of the first user in the group
+ * @param syncReplyFreshness freshness of sync data
+ * @param helloReplyFreshness freshness of hello data
+ */
+ ProducerBase(size_t expectedNumEntries,
+ ndn::Face& face,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS,
+ ndn::time::milliseconds helloReplyFreshness = HELLO_REPLY_FRESHNESS);
+public:
+ /**
+ * @brief Returns the current sequence number of the given prefix
+ *
+ * @param prefix prefix to get the sequence number of
+ */
+ ndn::optional<uint64_t>
+ getSeqNo(const ndn::Name& prefix) const
+ {
+ auto it = m_prefixes.find(prefix);
+ if (it == m_prefixes.end()) {
+ return ndn::nullopt;
+ }
+ return it->second;
+ }
+
+ /**
+ * @brief Adds a user node for synchronization
+ *
+ * Initializes m_prefixes[prefix] to zero
+ * Does not add zero-th sequence number to IBF
+ * because if a large number of user nodes are added
+ * then decoding of the difference between own IBF and
+ * other IBF will not be possible
+ *
+ * @param prefix the user node to be added
+ */
+ bool
+ addUserNode(const ndn::Name& prefix);
+
+ /**
+ * @brief Remove the user node from synchronization
+ *
+ * Erases prefix from IBF and other maps
+ *
+ * @param prefix the user node to be removed
+ */
+ void
+ removeUserNode(const ndn::Name& prefix);
+
+PUBLIC_WITH_TESTS_ELSE_PROTECTED:
+ /**
+ * @brief Update m_prefixes and IBF with the given prefix and seq
+ *
+ * Whoever calls this needs to make sure that prefix is in m_prefixes
+ * We remove already existing prefix/seq from IBF
+ * (unless seq is zero because we don't insert zero seq into IBF)
+ * Then we update m_prefix, m_prefix2hash, m_hash2prefix, and IBF
+ *
+ * @param prefix prefix of the update
+ * @param seq sequence number of the update
+ */
+ void
+ updateSeqNo(const ndn::Name& prefix, uint64_t seq);
+
+ bool
+ isUserNode(const ndn::Name& prefix) {
+ if (m_prefixes.find(prefix) == m_prefixes.end()) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @brief Sends a data packet with content type nack
+ *
+ * Producer sends a nack to consumer if consumer has very old IBF
+ * whose differences with latest IBF can't be decoded successfully
+ *
+ * @param name send application nack with this name
+ */
+ void
+ sendApplicationNack(const ndn::Name& name);
+
+ /**
+ * @brief Logs a message if setting an interest filter fails
+ *
+ * @param prefix
+ * @param msg
+ */
+ void
+ onRegisterFailed(const ndn::Name& prefix, const std::string& msg) const;
+
+PUBLIC_WITH_TESTS_ELSE_PROTECTED:
+ IBLT m_iblt;
+ uint32_t m_expectedNumEntries;
+ // Threshold is used check if the differences are greater
+ // than it and whether we need to update the other side.
+ uint32_t m_threshold;
+
+ // prefix and sequence number
+ std::map <ndn::Name, uint64_t> m_prefixes;
+ // Just for looking up hash faster (instead of calculating it again)
+ // Only used in updateSeqNo, prefix/seqNo is the key
+ std::map <ndn::Name, uint32_t> m_prefix2hash;
+ // Value is prefix (and not prefix/seqNo)
+ std::map <uint32_t, ndn::Name> m_hash2prefix;
+
+ ndn::Face& m_face;
+ ndn::KeyChain m_keyChain;
+ ndn::Scheduler m_scheduler;
+
+ ndn::Name m_syncPrefix;
+ ndn::Name m_userPrefix;
+
+ ndn::time::milliseconds m_syncReplyFreshness;
+ ndn::time::milliseconds m_helloReplyFreshness;
+
+ std::mt19937 m_rng;
+};
+
+} // namespace psync
+
+#endif // PSYNC_PRODUCER_BASE_HPP