PSync: initial commit

refs: #4641

Change-Id: Iabed3ad7632544d97559e6798547b7972b416784
diff --git a/tests/test-partial-sync.cpp b/tests/test-partial-sync.cpp
new file mode 100644
index 0000000..0b2800a
--- /dev/null
+++ b/tests/test-partial-sync.cpp
@@ -0,0 +1,349 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "partial-producer.hpp"
+#include "consumer.hpp"
+#include "unit-test-time-fixture.hpp"
+
+#include <boost/test/unit_test.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/util/dummy-client-face.hpp>
+
+#include <iostream>
+
+namespace psync {
+
+using namespace ndn;
+using namespace std;
+
+class PartialSyncFixture : public tests::UnitTestTimeFixture
+{
+public:
+  PartialSyncFixture()
+   : face(io, {true, true})
+   , syncPrefix("psync")
+   , userPrefix("testUser-0")
+   , numHelloDataRcvd(0)
+   , numSyncDataRcvd(0)
+  {
+    producer = make_shared<PartialProducer>(40, face, syncPrefix, userPrefix);
+    addUserNodes("testUser", 10);
+  }
+
+  void
+  addConsumer(int id, const vector<string>& subscribeTo)
+  {
+    consumerFaces[id] = make_shared<util::DummyClientFace>(io, util::DummyClientFace::Options{true, true});
+
+    face.linkTo(*consumerFaces[id]);
+
+    consumers[id] = make_shared<Consumer>(syncPrefix, *consumerFaces[id],
+                      [&, id] (const vector<Name>& availableSubs)
+                      {
+                        numHelloDataRcvd++;
+                        checkSubList(availableSubs);
+
+                        checkIBFUpdated(id);
+
+                        for (const auto& sub : subscribeTo) {
+                          consumers[id]->addSubscription(sub);
+                        }
+                        consumers[id]->sendSyncInterest();
+                      },
+                      [&, id] (const std::vector<MissingDataInfo>& updates) {
+                        numSyncDataRcvd++;
+
+                        checkIBFUpdated(id);
+
+                        for (const auto& update : updates) {
+                          BOOST_CHECK(consumers[id]->isSubscribed(update.prefix));
+                          BOOST_CHECK_EQUAL(oldSeqMap.at(update.prefix) + 1, update.lowSeq);
+                          BOOST_CHECK_EQUAL(producer->m_prefixes.at(update.prefix), update.highSeq);
+                          BOOST_CHECK_EQUAL(consumers[id]->getSeqNo(update.prefix).value(), update.highSeq);
+                        }
+                      }, 40, 0.001);
+
+    advanceClocks(ndn::time::milliseconds(10));
+  }
+
+  void
+  checkIBFUpdated(int id)
+  {
+    Name emptyName;
+    producer->m_iblt.appendToName(emptyName);
+    BOOST_CHECK_EQUAL(consumers[id]->m_iblt, emptyName);
+  }
+
+  bool
+  checkSubList(const vector<Name>& availableSubs)
+  {
+    for (const auto& prefix : producer->m_prefixes ) {
+      for (const auto& sub : availableSubs) {
+        if (prefix.first != sub) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  void
+  addUserNodes(const std::string& prefix, int numOfUserNodes)
+  {
+    // zeroth is added through constructor
+    for (int i = 1; i < numOfUserNodes; i++) {
+      producer->addUserNode(prefix + "-" + to_string(i));
+    }
+  }
+
+  void
+  publishUpdateFor(const std::string& prefix)
+  {
+    oldSeqMap = producer->m_prefixes;
+    producer->publishName(prefix);
+    advanceClocks(ndn::time::milliseconds(10));
+  }
+
+  void
+  updateSeqFor(const std::string& prefix, uint64_t seq)
+  {
+    oldSeqMap = producer->m_prefixes;
+    producer->updateSeqNo(prefix, seq);
+  }
+
+  util::DummyClientFace face;
+  Name syncPrefix;
+  Name userPrefix;
+
+  shared_ptr<PartialProducer> producer;
+  std::map <ndn::Name, uint64_t> oldSeqMap;
+
+  shared_ptr<Consumer> consumers[3];
+  shared_ptr<util::DummyClientFace> consumerFaces[3];
+  int numHelloDataRcvd;
+  int numSyncDataRcvd;
+};
+
+BOOST_FIXTURE_TEST_SUITE(PartialSync, PartialSyncFixture)
+
+BOOST_AUTO_TEST_CASE(Simple)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+  publishUpdateFor("testUser-3");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+}
+
+BOOST_AUTO_TEST_CASE(MissedUpdate)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+  updateSeqFor("testUser-2", 3);
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
+
+  // The sync interest sent after hello will timeout
+  advanceClocks(ndn::time::milliseconds(1000));
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
+
+  // Next sync interest will bring back the sync data
+  advanceClocks(ndn::time::milliseconds(1000));
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+}
+
+BOOST_AUTO_TEST_CASE(LateSubscription)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+
+  consumers[0]->addSubscription("testUser-3");
+  consumers[0]->sendSyncInterest();
+  publishUpdateFor("testUser-3");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+}
+
+BOOST_AUTO_TEST_CASE(ConsumerSyncTimeout)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 0);
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 1);
+  advanceClocks(ndn::time::milliseconds(10), 100);
+  BOOST_CHECK_EQUAL(producer->m_pendingEntries.size(), 0);
+  advanceClocks(ndn::time::milliseconds(10), 100);
+
+  int numSyncInterests = 0;
+  for (const auto& interest : consumerFaces[0]->sentInterests) {
+    if (interest.getName().getSubName(0, 2) == Name("/psync/sync")) {
+      numSyncInterests++;
+    }
+  }
+  BOOST_CHECK_EQUAL(numSyncInterests, 2);
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
+}
+
+BOOST_AUTO_TEST_CASE(MultipleConsumersWithSameSubList)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+  addConsumer(1, subscribeTo);
+  addConsumer(2, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  consumers[1]->sendHelloInterest();
+  consumers[2]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 3);
+
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 3);
+
+  publishUpdateFor("testUser-3");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 3);
+}
+
+BOOST_AUTO_TEST_CASE(MultipleConsumersWithDifferentSubList)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  vector<string> subscribeTo1{"testUser-1", "testUser-3", "testUser-5"};
+  addConsumer(1, subscribeTo1);
+
+  vector<string> subscribeTo2{"testUser-2", "testUser-3"};
+  addConsumer(2, subscribeTo2);
+
+  consumers[0]->sendHelloInterest();
+  consumers[1]->sendHelloInterest();
+  consumers[2]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 3);
+
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+
+  numSyncDataRcvd = 0;
+  publishUpdateFor("testUser-3");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+}
+
+BOOST_AUTO_TEST_CASE(ReplicatedProducer)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+
+  // Link to first producer goes down
+  face.unlink();
+
+  util::DummyClientFace face2(io, {true, true});
+  PartialProducer replicatedProducer(40, face2, syncPrefix, userPrefix);
+  for (int i = 1; i < 10; i++) {
+      replicatedProducer.addUserNode("testUser-" + to_string(i));
+  }
+  advanceClocks(ndn::time::milliseconds(10));
+  replicatedProducer.publishName("testUser-2");
+  // Link to a replicated producer comes up
+  face2.linkTo(*consumerFaces[0]);
+
+  BOOST_CHECK_EQUAL(face2.sentData.size(), 0);
+
+  // Update in first producer as well so consumer on sync data
+  // callback checks still pass
+  publishUpdateFor("testUser-2");
+  replicatedProducer.publishName("testUser-2");
+  advanceClocks(ndn::time::milliseconds(15), 100);
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+  BOOST_CHECK_EQUAL(face2.sentData.size(), 1);
+}
+
+BOOST_AUTO_TEST_CASE(ApplicationNack)
+{
+  // 50 is more than expected number of entries of 40 in the producer's IBF
+  addUserNodes("testUser", 50);
+
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+  publishUpdateFor("testUser-2");
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+
+  oldSeqMap = producer->m_prefixes;
+  for (int i = 0; i < 50; i++) {
+    ndn::Name prefix("testUser-" + to_string(i));
+    producer->updateSeqNo(prefix, producer->getSeqNo(prefix).value() + 1);
+  }
+  // Next sync interest should trigger the nack
+  advanceClocks(ndn::time::milliseconds(15), 100);
+
+  // Nack does not contain any content so still should be 1
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+
+  bool nackRcvd = false;
+  for (const auto& data : face.sentData) {
+    if (data.getContentType() == ndn::tlv::ContentType_Nack) {
+      nackRcvd = true;
+      break;
+    }
+  }
+  BOOST_CHECK(nackRcvd);
+
+  producer->publishName("testUser-4");
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 2);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace psync
\ No newline at end of file