**Breaking change** Use bzip2 compression of sync data payload
Change-Id: I0a322e3268a5adc9d221c23c43fc6899c9dbf836
Refs: #4140
diff --git a/src/bzip2-helper.cpp b/src/bzip2-helper.cpp
new file mode 100644
index 0000000..527c4f4
--- /dev/null
+++ b/src/bzip2-helper.cpp
@@ -0,0 +1,59 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2018 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "bzip2-helper.hpp"
+
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/detail/iostream.hpp>
+#include <boost/iostreams/filter/bzip2.hpp>
+#include <boost/iostreams/copy.hpp>
+
+#include <ndn-cxx/encoding/buffer-stream.hpp>
+
+namespace chronosync {
+namespace bzip2 {
+
+namespace bio = boost::iostreams;
+
+std::shared_ptr<ndn::Buffer>
+compress(const char* buffer, size_t bufferSize)
+{
+ ndn::OBufferStream os;
+ bio::filtering_stream<bio::output> out;
+ out.push(bio::bzip2_compressor());
+ out.push(os);
+ bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
+ bio::copy(in, out);
+ return os.buf();
+}
+
+std::shared_ptr<ndn::Buffer>
+decompress(const char* buffer, size_t bufferSize)
+{
+ ndn::OBufferStream os;
+ bio::filtering_stream<bio::output> out;
+ out.push(bio::bzip2_decompressor());
+ out.push(os);
+ bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
+ bio::copy(in, out);
+ return os.buf();
+}
+
+} // namespace bzip2
+} // namespace chronosync
diff --git a/src/bzip2-helper.hpp b/src/bzip2-helper.hpp
new file mode 100644
index 0000000..60485c4
--- /dev/null
+++ b/src/bzip2-helper.hpp
@@ -0,0 +1,43 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2018 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef CHRONOSYNC_BZIP2_HELPER_HPP
+#define CHRONOSYNC_BZIP2_HELPER_HPP
+
+#include <ndn-cxx/encoding/buffer.hpp>
+
+namespace chronosync {
+namespace bzip2 {
+
+/**
+ * @brief Compress @p buffer of size @p bufferSize with bzip2
+ */
+std::shared_ptr<ndn::Buffer>
+compress(const char* buffer, size_t bufferSize);
+
+/**
+ * @brief Decompress buffer @p buffer of size @p bufferSize with bzip2
+ */
+std::shared_ptr<ndn::Buffer>
+decompress(const char* buffer, size_t bufferSize);
+
+} // namespace bzip2
+} // namespace chronosync
+
+#endif // CHRONOSYNC_BZIP2_HELPER_HPP
diff --git a/src/logic.cpp b/src/logic.cpp
index 5e87153..9d0f3fa 100644
--- a/src/logic.cpp
+++ b/src/logic.cpp
@@ -25,6 +25,7 @@
#include "logic.hpp"
#include "logger.hpp"
+#include "bzip2-helper.hpp"
#include <ndn-cxx/util/backports.hpp>
#include <ndn-cxx/util/string-helper.hpp>
@@ -413,7 +414,9 @@
Name name = data.getName();
ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
- processSyncData(name, digest, data.getContent().blockFromValue(), firstData);
+ auto contentBuffer = bzip2::decompress(reinterpret_cast<const char*>(data.getContent().value()),
+ data.getContent().value_size());
+ processSyncData(name, digest, Block(contentBuffer), firstData);
}
void
@@ -669,10 +672,10 @@
}
void
-Logic::trimState(State& partialState, const State& state, size_t maxSize)
+Logic::trimState(State& partialState, const State& state, size_t nExcludedStates)
{
partialState.reset();
- State tmp;
+
std::vector<ConstLeafPtr> leaves;
for (const ConstLeafPtr& leaf : state.getLeaves()) {
leaves.push_back(leaf);
@@ -680,15 +683,52 @@
std::shuffle(leaves.begin(), leaves.end(), m_rng);
+ size_t statesToEncode = leaves.size() - std::min(leaves.size() - 1, nExcludedStates);
for (const auto& constLeafPtr : leaves) {
- tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
- if (tmp.wireEncode().size() >= maxSize) {
+ if (statesToEncode == 0) {
break;
}
partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
+ --statesToEncode;
}
}
+Data
+Logic::encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state)
+{
+ Data syncReply(name);
+ syncReply.setFreshnessPeriod(m_syncReplyFreshness);
+
+ auto finalizeReply = [this, &nodePrefix, &syncReply] (const State& state) {
+ auto contentBuffer = bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
+ state.wireEncode().size());
+ syncReply.setContent(contentBuffer);
+
+ if (m_nodeList[nodePrefix].signingId.empty())
+ m_keyChain.sign(syncReply);
+ else
+ m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
+ };
+
+ finalizeReply(state);
+
+ size_t nExcludedStates = 1;
+ while (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
+ if (nExcludedStates == 1) {
+ // To show this debug message only once
+ _LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << (getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) << ")");
+ }
+ State partialState;
+ trimState(partialState, state, nExcludedStates);
+ finalizeReply(partialState);
+
+ BOOST_ASSERT(state.getLeaves().size() != 0);
+ nExcludedStates *= 2;
+ }
+
+ return syncReply;
+}
+
void
Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
{
@@ -696,31 +736,7 @@
if (m_nodeList.find(nodePrefix) == m_nodeList.end())
return;
- Data syncReply(name);
- syncReply.setContent(state.wireEncode());
- syncReply.setFreshnessPeriod(m_syncReplyFreshness);
-
- if (m_nodeList[nodePrefix].signingId.empty())
- m_keyChain.sign(syncReply);
- else
- m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
-
- if (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
- _LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << getMaxPacketLimit() << ")");
- auto maxContentSize = getMaxPacketLimit() - (syncReply.wireEncode().size() - syncReply.getContent().size());
- maxContentSize -= NDNLP_EXPECTED_OVERHEAD;
-
- State partialState;
- trimState(partialState, state, maxContentSize);
- syncReply.setContent(partialState.wireEncode());
-
- if (m_nodeList[nodePrefix].signingId.empty())
- m_keyChain.sign(syncReply);
- else
- m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
- }
-
- m_face.put(syncReply);
+ m_face.put(encodeSyncReply(nodePrefix, name, state));
// checking if our own interest got satisfied
if (m_outstandingInterestName == name) {
diff --git a/src/logic.hpp b/src/logic.hpp
index d33ed39..1d4fa8e 100644
--- a/src/logic.hpp
+++ b/src/logic.hpp
@@ -230,9 +230,12 @@
return m_state;
}
- /// @brief Trim @p state to a subset @p partialState whose encoding does not exceed @p maxSize
+ /// Create a subset @p partialState excluding @p nExcludedStates from @p state
void
- trimState(State& partialState, const State& state, size_t maxSize);
+ trimState(State& partialState, const State& state, size_t excludedStates);
+
+ Data
+ encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state);
private:
/**