**Breaking change** Use bzip2 compression of sync data payload

Change-Id: I0a322e3268a5adc9d221c23c43fc6899c9dbf836
Refs: #4140
diff --git a/src/bzip2-helper.cpp b/src/bzip2-helper.cpp
new file mode 100644
index 0000000..527c4f4
--- /dev/null
+++ b/src/bzip2-helper.cpp
@@ -0,0 +1,59 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2018 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "bzip2-helper.hpp"
+
+#include <boost/iostreams/filtering_stream.hpp>
+#include <boost/iostreams/detail/iostream.hpp>
+#include <boost/iostreams/filter/bzip2.hpp>
+#include <boost/iostreams/copy.hpp>
+
+#include <ndn-cxx/encoding/buffer-stream.hpp>
+
+namespace chronosync {
+namespace bzip2 {
+
+namespace bio = boost::iostreams;
+
+std::shared_ptr<ndn::Buffer>
+compress(const char* buffer, size_t bufferSize)
+{
+  ndn::OBufferStream os;
+  bio::filtering_stream<bio::output> out;
+  out.push(bio::bzip2_compressor());
+  out.push(os);
+  bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
+  bio::copy(in, out);
+  return os.buf();
+}
+
+std::shared_ptr<ndn::Buffer>
+decompress(const char* buffer, size_t bufferSize)
+{
+  ndn::OBufferStream os;
+  bio::filtering_stream<bio::output> out;
+  out.push(bio::bzip2_decompressor());
+  out.push(os);
+  bio::stream<bio::array_source> in(reinterpret_cast<const char*>(buffer), bufferSize);
+  bio::copy(in, out);
+  return os.buf();
+}
+
+} // namespace bzip2
+} // namespace chronosync
diff --git a/src/bzip2-helper.hpp b/src/bzip2-helper.hpp
new file mode 100644
index 0000000..60485c4
--- /dev/null
+++ b/src/bzip2-helper.hpp
@@ -0,0 +1,43 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2018 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef CHRONOSYNC_BZIP2_HELPER_HPP
+#define CHRONOSYNC_BZIP2_HELPER_HPP
+
+#include <ndn-cxx/encoding/buffer.hpp>
+
+namespace chronosync {
+namespace bzip2 {
+
+/**
+ * @brief Compress @p buffer of size @p bufferSize with bzip2
+ */
+std::shared_ptr<ndn::Buffer>
+compress(const char* buffer, size_t bufferSize);
+
+/**
+ * @brief Decompress buffer @p buffer of size @p bufferSize with bzip2
+ */
+std::shared_ptr<ndn::Buffer>
+decompress(const char* buffer, size_t bufferSize);
+
+} // namespace bzip2
+} // namespace chronosync
+
+#endif // CHRONOSYNC_BZIP2_HELPER_HPP
diff --git a/src/logic.cpp b/src/logic.cpp
index 5e87153..9d0f3fa 100644
--- a/src/logic.cpp
+++ b/src/logic.cpp
@@ -25,6 +25,7 @@
 
 #include "logic.hpp"
 #include "logger.hpp"
+#include "bzip2-helper.hpp"
 
 #include <ndn-cxx/util/backports.hpp>
 #include <ndn-cxx/util/string-helper.hpp>
@@ -413,7 +414,9 @@
   Name name = data.getName();
   ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
 
-  processSyncData(name, digest, data.getContent().blockFromValue(), firstData);
+  auto contentBuffer = bzip2::decompress(reinterpret_cast<const char*>(data.getContent().value()),
+                                         data.getContent().value_size());
+  processSyncData(name, digest, Block(contentBuffer), firstData);
 }
 
 void
@@ -669,10 +672,10 @@
 }
 
 void
-Logic::trimState(State& partialState, const State& state, size_t maxSize)
+Logic::trimState(State& partialState, const State& state, size_t nExcludedStates)
 {
   partialState.reset();
-  State tmp;
+
   std::vector<ConstLeafPtr> leaves;
   for (const ConstLeafPtr& leaf : state.getLeaves()) {
     leaves.push_back(leaf);
@@ -680,15 +683,52 @@
 
   std::shuffle(leaves.begin(), leaves.end(), m_rng);
 
+  size_t statesToEncode = leaves.size() - std::min(leaves.size() - 1, nExcludedStates);
   for (const auto& constLeafPtr : leaves) {
-    tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
-    if (tmp.wireEncode().size() >= maxSize) {
+    if (statesToEncode == 0) {
       break;
     }
     partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
+    --statesToEncode;
   }
 }
 
+Data
+Logic::encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state)
+{
+  Data syncReply(name);
+  syncReply.setFreshnessPeriod(m_syncReplyFreshness);
+
+  auto finalizeReply = [this, &nodePrefix, &syncReply] (const State& state) {
+    auto contentBuffer = bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
+                                         state.wireEncode().size());
+    syncReply.setContent(contentBuffer);
+
+    if (m_nodeList[nodePrefix].signingId.empty())
+      m_keyChain.sign(syncReply);
+    else
+      m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
+  };
+
+  finalizeReply(state);
+
+  size_t nExcludedStates = 1;
+  while (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
+    if (nExcludedStates == 1) {
+      // To show this debug message only once
+      _LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << (getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) << ")");
+    }
+    State partialState;
+    trimState(partialState, state, nExcludedStates);
+    finalizeReply(partialState);
+
+    BOOST_ASSERT(state.getLeaves().size() != 0);
+    nExcludedStates *= 2;
+  }
+
+  return syncReply;
+}
+
 void
 Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
 {
@@ -696,31 +736,7 @@
   if (m_nodeList.find(nodePrefix) == m_nodeList.end())
     return;
 
-  Data syncReply(name);
-  syncReply.setContent(state.wireEncode());
-  syncReply.setFreshnessPeriod(m_syncReplyFreshness);
-
-  if (m_nodeList[nodePrefix].signingId.empty())
-    m_keyChain.sign(syncReply);
-  else
-    m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
-
-  if (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
-    _LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << getMaxPacketLimit() << ")");
-    auto maxContentSize = getMaxPacketLimit() - (syncReply.wireEncode().size() - syncReply.getContent().size());
-    maxContentSize -= NDNLP_EXPECTED_OVERHEAD;
-
-    State partialState;
-    trimState(partialState, state, maxContentSize);
-    syncReply.setContent(partialState.wireEncode());
-
-    if (m_nodeList[nodePrefix].signingId.empty())
-      m_keyChain.sign(syncReply);
-    else
-      m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
-  }
-
-  m_face.put(syncReply);
+  m_face.put(encodeSyncReply(nodePrefix, name, state));
 
   // checking if our own interest got satisfied
   if (m_outstandingInterestName == name) {
diff --git a/src/logic.hpp b/src/logic.hpp
index d33ed39..1d4fa8e 100644
--- a/src/logic.hpp
+++ b/src/logic.hpp
@@ -230,9 +230,12 @@
     return m_state;
   }
 
-  /// @brief Trim @p state to a subset @p partialState whose encoding does not exceed @p maxSize
+  /// Create a subset @p partialState excluding @p nExcludedStates from @p state
   void
-  trimState(State& partialState, const State& state, size_t maxSize);
+  trimState(State& partialState, const State& state, size_t excludedStates);
+
+  Data
+  encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state);
 
 private:
   /**
diff --git a/tests/unit-tests/bzip2-helper.t.cpp b/tests/unit-tests/bzip2-helper.t.cpp
new file mode 100644
index 0000000..330f66b
--- /dev/null
+++ b/tests/unit-tests/bzip2-helper.t.cpp
@@ -0,0 +1,52 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012-2018 University of California, Los Angeles
+ *
+ * This file is part of ChronoSync, synchronization library for distributed realtime
+ * applications for NDN.
+ *
+ * ChronoSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation, either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ChronoSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "bzip2-helper.hpp"
+
+#include "boost-test.hpp"
+
+namespace chronosync {
+namespace test {
+
+using std::tuple;
+
+BOOST_AUTO_TEST_SUITE(TestBzip2Helper)
+
+BOOST_AUTO_TEST_CASE(Basic)
+{
+  std::string message = "Lorem ipsum dolor sit amet, consectetur adipiscing elit, "
+    "sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. "
+    "Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris "
+    "nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in "
+    "reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. "
+    "Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt "
+    "mollit anim id est laborum.";
+
+  auto compressed = bzip2::compress(message.data(), message.size());
+  BOOST_CHECK_LT(compressed->size(), message.size());
+
+  auto decompressed = bzip2::decompress(reinterpret_cast<const char*>(compressed->data()), compressed->size());
+  BOOST_CHECK_EQUAL(message.size(), decompressed->size());
+  BOOST_CHECK_EQUAL(message, std::string(reinterpret_cast<const char*>(decompressed->data()), decompressed->size()));
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace test
+} // namespace chronosync
diff --git a/tests/unit-tests/test-logic.cpp b/tests/unit-tests/test-logic.cpp
index fb0422a..83e8708 100644
--- a/tests/unit-tests/test-logic.cpp
+++ b/tests/unit-tests/test-logic.cpp
@@ -18,12 +18,15 @@
  */
 
 #include "logic.hpp"
+#include "bzip2-helper.hpp"
 
 #include "boost-test.hpp"
 #include "../identity-management-fixture.hpp"
 
 #include "dummy-forwarder.hpp"
 
+#include <ndn-cxx/util/random.hpp>
+
 namespace chronosync {
 namespace test {
 
@@ -319,7 +322,7 @@
   BOOST_CHECK_EQUAL(handler[1]->logic.getSessionNames().size(), 2);
 }
 
-BOOST_FIXTURE_TEST_CASE(ExplodeData, ndn::tests::IdentityManagementTimeFixture)
+BOOST_FIXTURE_TEST_CASE(TrimState, ndn::tests::IdentityManagementTimeFixture)
 {
   Name syncPrefix("/ndn/broadcast/sync");
   Name userPrefix("/user");
@@ -327,25 +330,47 @@
   Logic logic(face, syncPrefix, userPrefix, bind(onUpdate, _1));
 
   State state;
-  int i = 0;
-  while (state.wireEncode().size() < ndn::MAX_NDN_PACKET_SIZE) {
-    Name name("/test1");
-    name.append(std::to_string(i));
-    state.update(name, i++);
+  for (size_t i = 0; i != 100; ++i) {
+    state.update(Name("/to/trim").appendNumber(i), 42);
   }
 
-  Data syncReply(syncPrefix);
-  syncReply.setContent(state.wireEncode());
-  m_keyChain.sign(syncReply);
+  State partial;
+  logic.trimState(partial, state, 1);
+  BOOST_CHECK_EQUAL(partial.getLeaves().size(), 99);
 
-  BOOST_REQUIRE(syncReply.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE);
+  logic.trimState(partial, state, 100);
+  BOOST_CHECK_EQUAL(partial.getLeaves().size(), 1);
 
-  State partialState;
-  auto maxSize = ndn::MAX_NDN_PACKET_SIZE - (syncReply.wireEncode().size() - state.wireEncode().size());
-  logic.trimState(partialState, state, maxSize);
+  logic.trimState(partial, state, 101);
+  BOOST_CHECK_EQUAL(partial.getLeaves().size(), 1);
 
-  syncReply.setContent(partialState.wireEncode());
-  BOOST_REQUIRE(syncReply.wireEncode().size() < ndn::MAX_NDN_PACKET_SIZE);
+  logic.trimState(partial, state, 42);
+  BOOST_CHECK_EQUAL(partial.getLeaves().size(), 58);
+}
+
+BOOST_FIXTURE_TEST_CASE(VeryLargeState, ndn::tests::IdentityManagementTimeFixture)
+{
+  addIdentity("/bla");
+  Name syncPrefix("/ndn/broadcast/sync");
+  Name userPrefix("/user");
+  ndn::util::DummyClientFace face;
+  Logic logic(face, syncPrefix, userPrefix, bind(onUpdate, _1));
+
+  State state;
+  for (size_t i = 0; i < 50000 && bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
+                                                  state.wireEncode().size())->size() < ndn::MAX_NDN_PACKET_SIZE;
+       i += 10) {
+    Name prefix("/to/trim");
+    prefix.appendNumber(i);
+    for (size_t j = 0; j != 20; ++j) {
+      prefix.appendNumber(ndn::random::generateWord32());
+    }
+    state.update(prefix, ndn::random::generateWord32());
+  }
+  BOOST_TEST_MESSAGE("Got state with " << state.getLeaves().size() << " leaves");
+
+  auto data = logic.encodeSyncReply(userPrefix, "/fake/prefix/of/interest", state);
+  BOOST_CHECK_LE(data.wireEncode().size(), ndn::MAX_NDN_PACKET_SIZE);
 }
 
 class MaxPacketCustomizationFixture
@@ -361,6 +386,7 @@
 
   ~MaxPacketCustomizationFixture()
   {
+    unsetenv("CHRONOSYNC_MAX_PACKET_SIZE");
     if (oldSize) {
       setenv("CHRONOSYNC_MAX_PACKET_SIZE", oldSize->c_str(), 1);
     }