support various compression schemes
refs: #5061, #4917
Change-Id: Icba04b8693e40c4f065293b8d688ba32c63bd7bb
diff --git a/PSync/detail/iblt.cpp b/PSync/detail/iblt.cpp
index a571ca3..5315f2a 100644
--- a/PSync/detail/iblt.cpp
+++ b/PSync/detail/iblt.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -44,17 +44,9 @@
*/
#include "PSync/detail/iblt.hpp"
-#include "PSync/detail/util.hpp"
-
-#include <boost/iostreams/device/array.hpp>
-#include <boost/iostreams/filtering_stream.hpp>
-#include <boost/iostreams/filter/zlib.hpp>
-#include <boost/iostreams/copy.hpp>
namespace psync {
-namespace bio = boost::iostreams;
-
const size_t N_HASH(3);
const size_t N_HASHCHECK(11);
@@ -75,7 +67,8 @@
return count == 0 && keySum == 0 && keyCheck == 0;
}
-IBLT::IBLT(size_t expectedNumEntries)
+IBLT::IBLT(size_t expectedNumEntries, CompressionScheme scheme)
+ : m_compressionScheme(scheme)
{
// 1.5x expectedNumEntries gives very low probability of decoding failure
size_t nEntries = expectedNumEntries + expectedNumEntries / 2;
@@ -228,13 +221,14 @@
void
IBLT::appendToName(ndn::Name& name) const
{
- size_t n = m_hashTable.size();
- size_t unitSize = (32 * 3) / 8; // hard coding
- size_t tableSize = unitSize * n;
+ constexpr size_t unitSize = (sizeof(m_hashTable[0].count) +
+ sizeof(m_hashTable[0].keySum) +
+ sizeof(m_hashTable[0].keyCheck));
- std::vector<char> table(tableSize);
+ size_t tableSize = unitSize * m_hashTable.size();
+ std::vector<uint8_t> table(tableSize);
- for (size_t i = 0; i < n; i++) {
+ for (size_t i = 0; i < m_hashTable.size(); i++) {
// table[i*12], table[i*12+1], table[i*12+2], table[i*12+3] --> hashTable[i].count
table[(i * unitSize)] = 0xFF & m_hashTable[i].count;
@@ -257,40 +251,24 @@
table[(i * unitSize) + 11] = 0xFF & (m_hashTable[i].keyCheck >> 24);
}
- bio::filtering_streambuf<bio::input> in;
- in.push(bio::zlib_compressor());
- in.push(bio::array_source(table.data(), table.size()));
-
- std::stringstream sstream;
- bio::copy(in, sstream);
-
- std::string compressedIBF = sstream.str();
- name.append(compressedIBF.begin(), compressedIBF.end());
+ auto compressed = compress(m_compressionScheme, table.data(), table.size());
+ name.append(ndn::name::Component(std::move(compressed)));
}
std::vector<uint32_t>
IBLT::extractValueFromName(const ndn::name::Component& ibltName) const
{
- std::string compressed(ibltName.value_begin(), ibltName.value_end());
+ auto decompressedBuf = decompress(m_compressionScheme, ibltName.value(), ibltName.value_size());
- bio::filtering_streambuf<bio::input> in;
- in.push(bio::zlib_decompressor());
- in.push(bio::array_source(compressed.data(), compressed.size()));
-
- std::stringstream sstream;
- bio::copy(in, sstream);
- std::string ibltStr = sstream.str();
-
- std::vector<uint8_t> ibltValues(ibltStr.begin(), ibltStr.end());
- size_t n = ibltValues.size() / 4;
+ size_t n = decompressedBuf->size() / 4;
std::vector<uint32_t> values(n, 0);
for (size_t i = 0; i < 4 * n; i += 4) {
- uint32_t t = (ibltValues[i + 3] << 24) +
- (ibltValues[i + 2] << 16) +
- (ibltValues[i + 1] << 8) +
- ibltValues[i];
+ uint32_t t = ((*decompressedBuf)[i + 3] << 24) +
+ ((*decompressedBuf)[i + 2] << 16) +
+ ((*decompressedBuf)[i + 1] << 8) +
+ (*decompressedBuf)[i];
values[i / 4] = t;
}
diff --git a/PSync/detail/iblt.hpp b/PSync/detail/iblt.hpp
index 09f53b8..d51612c 100644
--- a/PSync/detail/iblt.hpp
+++ b/PSync/detail/iblt.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -46,6 +46,8 @@
#ifndef PSYNC_IBLT_HPP
#define PSYNC_IBLT_HPP
+#include "PSync/detail/util.hpp"
+
#include <ndn-cxx/name.hpp>
#include <inttypes.h>
@@ -90,8 +92,10 @@
* @brief constructor
*
* @param expectedNumEntries the expected number of entries in the IBLT
+ * @param scheme compression scheme to be used for the IBLT
*/
- explicit IBLT(size_t expectedNumEntries);
+ explicit
+ IBLT(size_t expectedNumEntries, CompressionScheme scheme = CompressionScheme::ZLIB);
/**
* @brief Populate the hash table using the vector representation of IBLT
@@ -125,7 +129,7 @@
IBLT
operator-(const IBLT& other) const;
- std::vector<HashTableEntry>
+ const std::vector<HashTableEntry>&
getHashTable() const
{
return m_hashTable;
@@ -165,6 +169,7 @@
std::vector<HashTableEntry> m_hashTable;
static const int INSERT = 1;
static const int ERASE = -1;
+ CompressionScheme m_compressionScheme;
};
bool
@@ -178,4 +183,4 @@
} // namespace psync
-#endif // PSYNC_IBLT_HPP
\ No newline at end of file
+#endif // PSYNC_IBLT_HPP
diff --git a/PSync/detail/state.hpp b/PSync/detail/state.hpp
index 32e9bfe..14b1eb0 100644
--- a/PSync/detail/state.hpp
+++ b/PSync/detail/state.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -42,7 +42,7 @@
void
addContent(const ndn::Name& prefix);
- std::vector<ndn::Name>
+ const std::vector<ndn::Name>&
getContent() const
{
return m_content;
diff --git a/PSync/detail/util.cpp b/PSync/detail/util.cpp
index 013dfe8..7e546fc 100644
--- a/PSync/detail/util.cpp
+++ b/PSync/detail/util.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -22,11 +22,35 @@
**/
#include "PSync/detail/util.hpp"
+#include "PSync/detail/config.hpp"
+#include <ndn-cxx/encoding/buffer-stream.hpp>
#include <ndn-cxx/util/backports.hpp>
+#include <ndn-cxx/util/exception.hpp>
+
+#include <boost/iostreams/device/array.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+#ifdef PSYNC_HAVE_ZLIB
+ #include <boost/iostreams/filter/zlib.hpp>
+#endif
+#ifdef PSYNC_HAVE_GZIP
+ #include <boost/iostreams/filter/gzip.hpp>
+#endif
+#ifdef PSYNC_HAVE_BZIP2
+ #include <boost/iostreams/filter/bzip2.hpp>
+#endif
+#ifdef PSYNC_HAVE_LZMA
+ #include <boost/iostreams/filter/lzma.hpp>
+#endif
+#ifdef PSYNC_HAVE_ZSTD
+ #include <boost/iostreams/filter/zstd.hpp>
+#endif
+#include <boost/iostreams/copy.hpp>
namespace psync {
+namespace bio = boost::iostreams;
+
static uint32_t
ROTL32 ( uint32_t x, int8_t r )
{
@@ -104,4 +128,114 @@
(unsigned char*)&value + sizeof(uint32_t)));
}
+std::shared_ptr<ndn::Buffer>
+compress(CompressionScheme scheme, const uint8_t* buffer, size_t bufferSize)
+{
+ ndn::OBufferStream out;
+ bio::filtering_streambuf<bio::input> in;
+ switch (scheme) {
+ case CompressionScheme::ZLIB:
+#ifdef PSYNC_HAVE_ZLIB
+ in.push(bio::zlib_compressor(bio::zlib::best_compression));
+#else
+ NDN_THROW(Error("ZLIB compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::GZIP:
+#ifdef PSYNC_HAVE_GZIP
+ in.push(bio::gzip_compressor(bio::gzip::best_compression));
+#else
+ NDN_THROW(Error("GZIP compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::BZIP2:
+#ifdef PSYNC_HAVE_BZIP2
+ in.push(bio::bzip2_compressor());
+#else
+ NDN_THROW(Error("BZIP2 compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::LZMA:
+#ifdef PSYNC_HAVE_LZMA
+ in.push(bio::lzma_compressor(bio::lzma::best_compression));
+#else
+ NDN_THROW(Error("LZMA compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::ZSTD:
+#ifdef PSYNC_HAVE_ZSTD
+ in.push(bio::zstd_compressor(bio::zstd::best_compression));
+#else
+ NDN_THROW(Error("ZSTD compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::NONE:
+ break;
+ }
+ in.push(bio::array_source(reinterpret_cast<const char*>(buffer), bufferSize));
+ bio::copy(in, out);
+
+ return out.buf();
+}
+
+std::shared_ptr<ndn::Buffer>
+decompress(CompressionScheme scheme, const uint8_t* buffer, size_t bufferSize)
+{
+ ndn::OBufferStream out;
+ bio::filtering_streambuf<bio::input> in;
+ switch (scheme) {
+ case CompressionScheme::ZLIB:
+#ifdef PSYNC_HAVE_ZLIB
+ in.push(bio::zlib_decompressor());
+#else
+ NDN_THROW(Error("ZLIB decompression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::GZIP:
+#ifdef PSYNC_HAVE_GZIP
+ in.push(bio::gzip_decompressor());
+#else
+ NDN_THROW(Error("GZIP compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::BZIP2:
+#ifdef PSYNC_HAVE_BZIP2
+ in.push(bio::bzip2_decompressor());
+#else
+ NDN_THROW(Error("BZIP2 compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::LZMA:
+#ifdef PSYNC_HAVE_LZMA
+ in.push(bio::lzma_decompressor());
+#else
+ NDN_THROW(Error("LZMA compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::ZSTD:
+#ifdef PSYNC_HAVE_ZSTD
+ in.push(bio::zstd_decompressor());
+#else
+ NDN_THROW(Error("ZSTD compression not supported!"));
+#endif
+ break;
+
+ case CompressionScheme::NONE:
+ break;
+ }
+ in.push(bio::array_source(reinterpret_cast<const char*>(buffer), bufferSize));
+ bio::copy(in, out);
+
+ return out.buf();
+}
+
} // namespace psync
diff --git a/PSync/detail/util.hpp b/PSync/detail/util.hpp
index 4eaede1..4da380b 100644
--- a/PSync/detail/util.hpp
+++ b/PSync/detail/util.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -48,6 +48,27 @@
uint64_t highSeq;
};
+enum class CompressionScheme {
+ NONE,
+ ZLIB,
+ GZIP,
+ BZIP2,
+ LZMA,
+ ZSTD
+};
+
+std::shared_ptr<ndn::Buffer>
+compress(CompressionScheme scheme, const uint8_t* buffer, size_t bufferSize);
+
+std::shared_ptr<ndn::Buffer>
+decompress(CompressionScheme scheme, const uint8_t* buffer, size_t bufferSize);
+
+class Error : public std::runtime_error
+{
+public:
+ using std::runtime_error::runtime_error;
+};
+
} // namespace psync
#endif // PSYNC_UTIL_HPP
diff --git a/PSync/full-producer.cpp b/PSync/full-producer.cpp
index 205f819..583f88e 100644
--- a/PSync/full-producer.cpp
+++ b/PSync/full-producer.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -37,8 +37,11 @@
const ndn::Name& userPrefix,
const UpdateCallback& onUpdateCallBack,
ndn::time::milliseconds syncInterestLifetime,
- ndn::time::milliseconds syncReplyFreshness)
- : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
+ ndn::time::milliseconds syncReplyFreshness,
+ CompressionScheme ibltCompression,
+ CompressionScheme contentCompression)
+ : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness,
+ ibltCompression, contentCompression)
, m_syncInterestLifetime(syncInterestLifetime)
, m_onUpdate(onUpdateCallBack)
, m_jitter(100, 500)
@@ -130,6 +133,7 @@
void
FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
{
+ // TODO: answer only segments from store.
if (m_segmentPublisher.replyFromStore(interest.getName())) {
return;
}
@@ -185,8 +189,7 @@
}
if (!state.getContent().empty()) {
- m_segmentPublisher.publish(interest.getName(), interest.getName(),
- state.wireEncode(), m_syncReplyFreshness);
+ sendSyncData(interest.getName(), state.wireEncode());
}
return;
@@ -224,9 +227,11 @@
ndn::Name nameWithIblt;
m_iblt.appendToName(nameWithIblt);
- // Append hash of our IBF so that data name maybe different for each node answering
+ // TODO: Remove appending of hash - serves no purpose to the receiver
ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
+ auto content = compress(m_contentCompression, block.wire(), block.size());
+
// checking if our own interest got satisfied
if (m_outstandingInterestName == name) {
NDN_LOG_DEBUG("Satisfied our own pending interest");
@@ -240,14 +245,14 @@
NDN_LOG_DEBUG("Sending Sync Data");
// Send data after removing pending sync interest on face
- m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
+ m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness);
NDN_LOG_TRACE("Renewing sync interest");
sendSyncInterest();
}
else {
NDN_LOG_DEBUG("Sending Sync Data");
- m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
+ m_segmentPublisher.publish(name, dataName, content, m_syncReplyFreshness);
}
}
@@ -256,7 +261,16 @@
{
deletePendingInterests(interest.getName());
- State state{ndn::Block{bufferPtr}};
+ State state;
+ try {
+ auto decompressed = decompress(m_contentCompression, bufferPtr->data(), bufferPtr->size());
+ state.wireDecode(ndn::Block(std::move(decompressed)));
+ }
+ catch (const std::exception& e) {
+ NDN_LOG_ERROR("Cannot parse received Sync Data: " << e.what());
+ return;
+ }
+
std::vector<MissingDataInfo> updates;
NDN_LOG_DEBUG("Sync Data Received: " << state);
diff --git a/PSync/full-producer.hpp b/PSync/full-producer.hpp
index 82b7be6..720e256 100644
--- a/PSync/full-producer.hpp
+++ b/PSync/full-producer.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -72,6 +72,8 @@
* @param onUpdateCallBack The call back to be called when there is new data
* @param syncInterestLifetime lifetime of the sync interest
* @param syncReplyFreshness freshness of sync data
+ * @param ibltCompression Compression scheme to use for IBF
+ * @param contentCompression Compression scheme to use for Data content
*/
FullProducer(size_t expectedNumEntries,
ndn::Face& face,
@@ -79,7 +81,9 @@
const ndn::Name& userPrefix,
const UpdateCallback& onUpdateCallBack,
ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFTIME,
- ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS);
+ ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS,
+ CompressionScheme ibltCompression = CompressionScheme::ZLIB,
+ CompressionScheme contentCompression = CompressionScheme::ZLIB);
~FullProducer();
@@ -125,7 +129,6 @@
void
onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
-private:
/**
* @brief Send sync data
*
@@ -157,6 +160,7 @@
void
onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr);
+private:
/**
* @brief Satisfy pending sync interests
*
diff --git a/PSync/partial-producer.cpp b/PSync/partial-producer.cpp
index e75bc38..513ed34 100644
--- a/PSync/partial-producer.cpp
+++ b/PSync/partial-producer.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -33,10 +33,12 @@
ndn::Face& face,
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
+ ndn::time::milliseconds helloReplyFreshness,
ndn::time::milliseconds syncReplyFreshness,
- ndn::time::milliseconds helloReplyFreshness)
+ CompressionScheme ibltCompression)
: ProducerBase(expectedNumEntries, face, syncPrefix,
- userPrefix, syncReplyFreshness, helloReplyFreshness)
+ userPrefix, syncReplyFreshness, ibltCompression)
+ , m_helloReplyFreshness(helloReplyFreshness)
{
m_registeredPrefix = m_face.registerPrefix(m_syncPrefix,
[this] (const ndn::Name& syncPrefix) {
@@ -136,7 +138,7 @@
}
BloomFilter bf;
- IBLT iblt(m_expectedNumEntries);
+ IBLT iblt(m_expectedNumEntries, m_ibltCompression);
try {
bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
diff --git a/PSync/partial-producer.hpp b/PSync/partial-producer.hpp
index 782078b..64a5a21 100644
--- a/PSync/partial-producer.hpp
+++ b/PSync/partial-producer.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -39,6 +39,8 @@
ndn::scheduler::ScopedEventId expirationEvent;
};
+const ndn::time::milliseconds HELLO_REPLY_FRESHNESS = 1_s;
+
/**
* @brief Partial sync logic to publish data names
*
@@ -62,13 +64,15 @@
* @param userPrefix The prefix of the first user in the group
* @param syncReplyFreshness freshness of sync data
* @param helloReplyFreshness freshness of hello data
+ * @param ibltCompression Compression scheme to use for IBF
*/
PartialProducer(size_t expectedNumEntries,
ndn::Face& face,
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
ndn::time::milliseconds helloReplyFreshness = HELLO_REPLY_FRESHNESS,
- ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS);
+ ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS,
+ CompressionScheme ibltCompression = CompressionScheme::NONE);
/**
* @brief Publish name to let subscribed consumers know
@@ -120,6 +124,7 @@
PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
std::map<ndn::Name, PendingEntryInfo> m_pendingEntries;
ndn::ScopedRegisteredPrefixHandle m_registeredPrefix;
+ ndn::time::milliseconds m_helloReplyFreshness;
};
} // namespace psync
diff --git a/PSync/producer-base.cpp b/PSync/producer-base.cpp
index d00fac0..48e1a89 100644
--- a/PSync/producer-base.cpp
+++ b/PSync/producer-base.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -35,8 +35,9 @@
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
ndn::time::milliseconds syncReplyFreshness,
- ndn::time::milliseconds helloReplyFreshness)
- : m_iblt(expectedNumEntries)
+ CompressionScheme ibltCompression,
+ CompressionScheme contentCompression)
+ : m_iblt(expectedNumEntries, ibltCompression)
, m_expectedNumEntries(expectedNumEntries)
, m_threshold(expectedNumEntries/2)
, m_face(face)
@@ -44,9 +45,10 @@
, m_syncPrefix(syncPrefix)
, m_userPrefix(userPrefix)
, m_syncReplyFreshness(syncReplyFreshness)
- , m_helloReplyFreshness(helloReplyFreshness)
, m_segmentPublisher(m_face, m_keyChain)
, m_rng(ndn::random::getRandomNumberEngine())
+ , m_ibltCompression(ibltCompression)
+ , m_contentCompression(contentCompression)
{
addUserNode(userPrefix);
}
diff --git a/PSync/producer-base.hpp b/PSync/producer-base.hpp
index 6463ea2..9583259 100644
--- a/PSync/producer-base.hpp
+++ b/PSync/producer-base.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -41,7 +41,6 @@
using namespace ndn::time_literals;
const ndn::time::milliseconds SYNC_REPLY_FRESHNESS = 1_s;
-const ndn::time::milliseconds HELLO_REPLY_FRESHNESS = 1_s;
/**
* @brief Base class for PartialProducer and FullProducer
@@ -65,14 +64,16 @@
* @param syncPrefix The prefix of the sync group
* @param userPrefix The prefix of the first user in the group
* @param syncReplyFreshness freshness of sync data
- * @param helloReplyFreshness freshness of hello data
+ * @param ibltCompression Compression scheme to use for IBF
+ * @param contentCompression Compression scheme to use for Data content
*/
ProducerBase(size_t expectedNumEntries,
ndn::Face& face,
const ndn::Name& syncPrefix,
const ndn::Name& userPrefix,
ndn::time::milliseconds syncReplyFreshness = SYNC_REPLY_FRESHNESS,
- ndn::time::milliseconds helloReplyFreshness = HELLO_REPLY_FRESHNESS);
+ CompressionScheme ibltCompression = CompressionScheme::ZLIB,
+ CompressionScheme contentCompression = CompressionScheme::ZLIB);
public:
/**
* @brief Returns the current sequence number of the given prefix
@@ -177,11 +178,12 @@
ndn::Name m_userPrefix;
ndn::time::milliseconds m_syncReplyFreshness;
- ndn::time::milliseconds m_helloReplyFreshness;
SegmentPublisher m_segmentPublisher;
ndn::random::RandomNumberEngine& m_rng;
+ CompressionScheme m_ibltCompression;
+ CompressionScheme m_contentCompression;
};
} // namespace psync
diff --git a/PSync/segment-publisher.cpp b/PSync/segment-publisher.cpp
index 6229949..8ab5de3 100644
--- a/PSync/segment-publisher.cpp
+++ b/PSync/segment-publisher.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -36,21 +36,28 @@
const ndn::Block& block, ndn::time::milliseconds freshness,
const ndn::security::SigningInfo& signingInfo)
{
+ auto buf = std::make_shared<const ndn::Buffer>(block.wire(), block.size());
+ publish(interestName, dataName, buf, freshness, signingInfo);
+}
+
+void
+SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName,
+ const std::shared_ptr<const ndn::Buffer>& buffer,
+ ndn::time::milliseconds freshness,
+ const ndn::security::SigningInfo& signingInfo)
+{
uint64_t interestSegment = 0;
if (interestName[-1].isSegment()) {
interestSegment = interestName[-1].toSegment();
}
- ndn::EncodingBuffer buffer;
- buffer.prependBlock(block);
-
- const uint8_t* rawBuffer = buffer.buf();
+ const uint8_t* rawBuffer = buffer->data();
const uint8_t* segmentBegin = rawBuffer;
- const uint8_t* end = rawBuffer + buffer.size();
+ const uint8_t* end = rawBuffer + buffer->size();
size_t maxPacketSize = (ndn::MAX_NDN_PACKET_SIZE >> 1);
- uint64_t totalSegments = buffer.size() / maxPacketSize;
+ uint64_t totalSegments = buffer->size() / maxPacketSize;
ndn::Name segmentPrefix(dataName);
segmentPrefix.appendVersion();
diff --git a/PSync/segment-publisher.hpp b/PSync/segment-publisher.hpp
index e96e07c..419e32f 100644
--- a/PSync/segment-publisher.hpp
+++ b/PSync/segment-publisher.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -59,6 +59,21 @@
ndn::security::v2::KeyChain::getDefaultSigningInfo());
/**
+ * @brief Put all the segments in memory.
+ *
+ * @param interestName the interest name, to determine the sequence to be answered immediately
+ * @param dataName the data name, has components after interest name
+ * @param buffer the content of the data
+ * @param freshness freshness of the segments
+ * @param signingInfo signing info to sign the data with
+ */
+ void
+ publish(const ndn::Name& interestName, const ndn::Name& dataName,
+ const std::shared_ptr<const ndn::Buffer>& buffer, ndn::time::milliseconds freshness,
+ const ndn::security::SigningInfo& signingInfo =
+ ndn::security::v2::KeyChain::getDefaultSigningInfo());
+
+ /**
* @brief Try to reply from memory, return false if we cannot find the segment.
*
* The caller is then expected to use publish if this returns false.
diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp
index 2bd7db4..fc56a0c 100644
--- a/tests/test-full-producer.cpp
+++ b/tests/test-full-producer.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -65,6 +65,26 @@
BOOST_CHECK_EQUAL(face.sentInterests.size(), 1);
}
+BOOST_AUTO_TEST_CASE(OnSyncDataDecodeFailure)
+{
+ Name syncPrefix("/psync"), userNode("/testUser");
+ util::DummyClientFace face({true, true});
+
+ FullProducer node(40, face, syncPrefix, userNode, nullptr);
+
+ ndn::Name syncInterestName(syncPrefix);
+ node.m_iblt.appendToName(syncInterestName);
+ ndn::Interest syncInterest(syncInterestName);
+
+ auto badCompress = std::make_shared<const ndn::Buffer>(5);
+
+ BOOST_REQUIRE_NO_THROW(node.onSyncData(syncInterest, badCompress));
+
+ const uint8_t test[] = {'t', 'e', 's', 't'};
+ auto goodCompressBadBlock = compress(node.m_contentCompression, &test[0], sizeof(test));
+ BOOST_REQUIRE_NO_THROW(node.onSyncData(syncInterest, goodCompressBadBlock));
+}
+
BOOST_AUTO_TEST_SUITE_END()
} // namespace psync
\ No newline at end of file
diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp
index af7c238..d79e012 100644
--- a/tests/test-full-sync.cpp
+++ b/tests/test-full-sync.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/*
- * Copyright (c) 2014-2019, The University of Memphis
+ * Copyright (c) 2014-2020, The University of Memphis
*
* This file is part of PSync.
* See AUTHORS.md for complete list of PSync authors and contributors.
@@ -398,7 +398,7 @@
{
addNode(0);
- for (int i = 0; i < 300; i++) {
+ for (int i = 0; i < 2000; i++) {
Name prefixToPublish("userNode0-" + to_string(i));
nodes[0]->addUserNode(prefixToPublish);
nodes[0]->publishName(prefixToPublish);
@@ -421,7 +421,7 @@
// Get data name from face and increase segment number to form next interest
Name dataName = faces[0]->sentData.front().getName();
- Name interestName = dataName.getSubName(0, dataName.size() - 1);
+ Name interestName = dataName.getSubName(0, dataName.size() - 2);
interestName.appendSegment(1);
faces[0]->sentData.clear();
@@ -436,4 +436,4 @@
BOOST_AUTO_TEST_SUITE_END()
-} // namespace psync
\ No newline at end of file
+} // namespace psync
diff --git a/tests/test-util.cpp b/tests/test-util.cpp
new file mode 100644
index 0000000..2ec1de8
--- /dev/null
+++ b/tests/test-util.cpp
@@ -0,0 +1,79 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2020, The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU Lesser General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License along with
+ * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "PSync/detail/util.hpp"
+#include "PSync/detail/config.hpp"
+#include "unit-test-time-fixture.hpp"
+
+#include <boost/test/unit_test.hpp>
+#include <iostream>
+
+namespace psync {
+
+using namespace ndn;
+using namespace std;
+
+BOOST_AUTO_TEST_SUITE(TestUtil)
+
+BOOST_AUTO_TEST_CASE(Basic)
+{
+ std::vector<CompressionScheme> available = {CompressionScheme::ZLIB, CompressionScheme::GZIP,
+ CompressionScheme::BZIP2, CompressionScheme::LZMA,
+ CompressionScheme::ZSTD};
+ std::vector<CompressionScheme> supported;
+ std::vector<CompressionScheme> notSupported;
+
+#ifdef PSYNC_HAVE_ZLIB
+ supported.push_back(CompressionScheme::ZLIB);
+#endif
+#ifdef PSYNC_HAVE_GZIP
+ supported.push_back(CompressionScheme::GZIP);
+#endif
+#ifdef PSYNC_HAVE_BZIP2
+ supported.push_back(CompressionScheme::BZIP2);
+#endif
+#ifdef PSYNC_HAVE_LZMA
+ supported.push_back(CompressionScheme::LZMA);
+#endif
+#ifdef PSYNC_HAVE_ZSTD
+ supported.push_back(CompressionScheme::ZSTD);
+#endif
+
+ const uint8_t uncompressed[] = {'t', 'e', 's', 't'};
+
+ for (const auto& s : supported) {
+ BOOST_CHECK_NO_THROW(compress(s, uncompressed, sizeof(uncompressed)));
+ auto compressed = compress(s, uncompressed, sizeof(uncompressed));
+ BOOST_CHECK_NO_THROW(decompress(s, compressed->data(), compressed->size()));
+ }
+
+ std::set_difference(available.begin(), available.end(), supported.begin(), supported.end(),
+ std::inserter(notSupported, notSupported.begin()));
+
+ for (const auto& s : notSupported) {
+ BOOST_CHECK_THROW(compress(s, uncompressed, sizeof(uncompressed)),
+ std::runtime_error);
+ BOOST_CHECK_THROW(decompress(s, uncompressed, sizeof(uncompressed)),
+ std::runtime_error);
+ }
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace psync
\ No newline at end of file
diff --git a/wscript b/wscript
index 8c89ab5..ee0e754 100644
--- a/wscript
+++ b/wscript
@@ -7,6 +7,13 @@
APPNAME = 'PSync'
GIT_TAG_PREFIX = ''
+BOOST_COMPRESSION_CODE = '''
+#include <boost/iostreams/filter/{0}.hpp>
+int main() {{ boost::iostreams::{0}_compressor test; }}
+'''
+
+COMPRESSION_SCHEMES = ['zlib', 'gzip', 'bzip2', 'lzma', 'zstd']
+
def options(opt):
opt.load(['compiler_c', 'compiler_cxx', 'gnu_dirs'])
opt.load(['default-compiler-flags', 'coverage', 'sanitizers',
@@ -19,6 +26,10 @@
optgrp.add_option('--with-tests', action='store_true', default=False,
help='Build unit tests')
+ for scheme in COMPRESSION_SCHEMES:
+ optgrp.add_option('--without-{}'.format(scheme), action='store_true', default=False,
+ help='Build without {}'.format(scheme))
+
def configure(conf):
conf.load(['compiler_c', 'compiler_cxx', 'gnu_dirs',
'default-compiler-flags', 'boost',
@@ -36,6 +47,14 @@
conf.check_boost(lib=boost_libs, mt=True)
+ for scheme in COMPRESSION_SCHEMES:
+ if getattr(conf.options, 'without_{}'.format(scheme)):
+ continue
+ conf.check_cxx(fragment=BOOST_COMPRESSION_CODE.format(scheme),
+ use='BOOST', execute=False, mandatory=False,
+ msg='Checking for {} support in boost iostreams'.format(scheme),
+ define_name='HAVE_{}'.format(scheme.upper()))
+
conf.check_compiler_flags()
# Loading "late" to prevent tests from being compiled with profiling flags