partial sync: segment hello and sync data

add segment publisher

refs: #4662

Change-Id: I62e7a2247bac58aeec364cd2a4e4d34259eae4af
diff --git a/tests/test-consumer.cpp b/tests/test-consumer.cpp
index 97adde3..9a745e6 100644
--- a/tests/test-consumer.cpp
+++ b/tests/test-consumer.cpp
@@ -23,8 +23,6 @@
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/util/dummy-client-face.hpp>
 
-#include <iostream>
-
 namespace psync {
 
 using namespace ndn;
diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp
index 23d2ab5..ee6764e 100644
--- a/tests/test-full-producer.cpp
+++ b/tests/test-full-producer.cpp
@@ -24,8 +24,6 @@
 #include <ndn-cxx/util/dummy-client-face.hpp>
 #include <ndn-cxx/mgmt/nfd/control-parameters.hpp>
 
-#include <iostream>
-
 namespace psync {
 
 using namespace ndn;
diff --git a/tests/test-full-sync.cpp b/tests/test-full-sync.cpp
index 3553c86..01a5375 100644
--- a/tests/test-full-sync.cpp
+++ b/tests/test-full-sync.cpp
@@ -26,8 +26,6 @@
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/util/dummy-client-face.hpp>
 
-#include <iostream>
-
 namespace psync {
 
 using namespace ndn;
@@ -332,35 +330,7 @@
 {
   addNode(0);
   addNode(1);
-
-  // Simple content store
-  faces[0]->onSendInterest.connect([this] (const Interest& interest) {
-                                    for (const auto& data : faces[1]->sentData) {
-                                      if (data.getName() == interest.getName()) {
-                                        faces[0]->receive(data);
-                                        return;
-                                      }
-                                    }
-                                    faces[1]->receive(interest);
-                                  });
-
-  faces[0]->onSendData.connect([this] (const Data& data) {
-                                faces[1]->receive(data);
-                              });
-
-  faces[1]->onSendInterest.connect([this] (const Interest& interest) {
-                                    for (const auto& data : faces[0]->sentData) {
-                                      if (data.getName() == interest.getName()) {
-                                        faces[1]->receive(data);
-                                        return;
-                                      }
-                                    }
-                                    faces[0]->receive(interest);
-                                  });
-
-  faces[1]->onSendData.connect([this] (const Data& data) {
-                                faces[0]->receive(data);
-                              });
+  faces[0]->linkTo(*faces[1]);
 
   advanceClocks(ndn::time::milliseconds(10));
 
@@ -416,7 +386,6 @@
   nodes[0]->publishName(Name("userNode0-" + to_string(totalUpdates)));
   advanceClocks(ndn::time::milliseconds(10), 100);
 
-  // No mechanism to recover yet
   for (int i = 0; i <= totalUpdates; i++) {
     Name userPrefix("userNode0-" + to_string(i));
     for (int j = 0; j < 4; j++) {
diff --git a/tests/test-partial-producer.cpp b/tests/test-partial-producer.cpp
index 3bed1ac..2ff1a5d 100644
--- a/tests/test-partial-producer.cpp
+++ b/tests/test-partial-producer.cpp
@@ -24,8 +24,6 @@
 #include <ndn-cxx/util/dummy-client-face.hpp>
 #include <ndn-cxx/mgmt/nfd/control-parameters.hpp>
 
-#include <iostream>
-
 namespace psync {
 
 using namespace ndn;
diff --git a/tests/test-partial-sync.cpp b/tests/test-partial-sync.cpp
index 8061194..6d07644 100644
--- a/tests/test-partial-sync.cpp
+++ b/tests/test-partial-sync.cpp
@@ -25,8 +25,6 @@
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/util/dummy-client-face.hpp>
 
-#include <iostream>
-
 namespace psync {
 
 using namespace ndn;
@@ -46,12 +44,23 @@
     addUserNodes("testUser", 10);
   }
 
+  ~PartialSyncFixture()
+  {
+    for (auto consumer : consumers) {
+      if (consumer) {
+        consumer->stop();
+      }
+    }
+  }
+
   void
-  addConsumer(int id, const vector<string>& subscribeTo)
+  addConsumer(int id, const vector<string>& subscribeTo, bool linkToProducer = true)
   {
     consumerFaces[id] = make_shared<util::DummyClientFace>(io, util::DummyClientFace::Options{true, true});
 
-    face.linkTo(*consumerFaces[id]);
+    if (linkToProducer) {
+      face.linkTo(*consumerFaces[id]);
+    }
 
     consumers[id] = make_shared<Consumer>(syncPrefix, *consumerFaces[id],
                       [&, id] (const vector<Name>& availableSubs)
@@ -174,7 +183,7 @@
   BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
 
   // Next sync interest will bring back the sync data
-  advanceClocks(ndn::time::milliseconds(1000));
+  advanceClocks(ndn::time::milliseconds(1500));
   BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
 }
 
@@ -343,6 +352,48 @@
   BOOST_CHECK_EQUAL(numSyncDataRcvd, 3);
 }
 
+BOOST_AUTO_TEST_CASE(SegmentedHello)
+{
+  vector<string> subscribeTo{"testUser-2", "testUser-4", "testUser-6"};
+  addConsumer(0, subscribeTo);
+
+  addUserNodes("testUser", 400);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+}
+
+BOOST_AUTO_TEST_CASE(SegmentedSync)
+{
+  ndn::Name longNameToExceedDataSize;
+  for (int i = 0; i < 100; i++) {
+    longNameToExceedDataSize.append("test-" + std::to_string(i));
+  }
+  addUserNodes(longNameToExceedDataSize.toUri(), 10);
+
+  vector<string> subscribeTo;
+  for (int i = 1; i < 10; i++) {
+    subscribeTo.push_back(longNameToExceedDataSize.toUri() + "-" + to_string(i));
+  }
+  addConsumer(0, subscribeTo);
+
+  consumers[0]->sendHelloInterest();
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numHelloDataRcvd, 1);
+
+  oldSeqMap = producer->m_prefixes;
+  for (int i = 1; i < 10; i++) {
+    producer->updateSeqNo(longNameToExceedDataSize.toUri() + "-" + to_string(i), 1);
+  }
+
+  advanceClocks(ndn::time::milliseconds(1000));
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 0);
+
+  advanceClocks(ndn::time::milliseconds(1500));
+  BOOST_CHECK_EQUAL(numSyncDataRcvd, 1);
+}
+
 BOOST_AUTO_TEST_SUITE_END()
 
 } // namespace psync
\ No newline at end of file
diff --git a/tests/test-segment-publisher.cpp b/tests/test-segment-publisher.cpp
new file mode 100644
index 0000000..39881d8
--- /dev/null
+++ b/tests/test-segment-publisher.cpp
@@ -0,0 +1,162 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "segment-publisher.hpp"
+#include "detail/state.hpp"
+#include "unit-test-time-fixture.hpp"
+
+#include <boost/test/unit_test.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/util/dummy-client-face.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
+
+namespace psync {
+
+using namespace ndn;
+
+class SegmentPublisherFixture : public tests::UnitTestTimeFixture
+{
+public:
+  SegmentPublisherFixture()
+  : face(io, util::DummyClientFace::Options{true, true})
+  , publisher(face, keyChain)
+  , freshness(1000)
+  , numComplete(0)
+  , numRepliesFromStore(0)
+  {
+    face.setInterestFilter(InterestFilter("/hello/world"),
+                           bind(&SegmentPublisherFixture::onInterest, this, _1, _2),
+                           [] (const ndn::Name& prefix, const std::string& msg) {
+                             BOOST_CHECK(false);
+                           });
+    advanceClocks(ndn::time::milliseconds(10));
+
+    for (int i = 0; i < 1000; ++i) {
+      state.addContent(Name("/test").appendNumber(i));
+    }
+  }
+
+  ~SegmentPublisherFixture() {
+    fetcher->stop();
+  }
+
+  void
+  expressInterest(const Interest& interest) {
+    fetcher = util::SegmentFetcher::start(face, interest, ndn::security::v2::getAcceptAllValidator());
+    fetcher->onComplete.connect([this] (ConstBufferPtr data) {
+                                 numComplete++;
+                               });
+    fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
+                              BOOST_CHECK(false);
+                            });
+
+    advanceClocks(ndn::time::milliseconds(10));
+  }
+
+  void
+  onInterest(const Name& prefix, const Interest& interest) {
+    if (publisher.replyFromStore(interest.getName())) {
+      numRepliesFromStore++;
+      return;
+    }
+
+    // If dataName is same as interest name
+    if (dataName.empty()) {
+      publisher.publish(interest.getName(), interest.getName(), state.wireEncode(), freshness);
+    }
+    else {
+      publisher.publish(interest.getName(), dataName, state.wireEncode(), freshness);
+    }
+  }
+
+  util::DummyClientFace face;
+  KeyChain keyChain;
+  SegmentPublisher publisher;
+  shared_ptr<util::SegmentFetcher> fetcher;
+  Name dataName;
+  time::milliseconds freshness;
+  State state;
+
+  int numComplete;
+  int numRepliesFromStore;
+};
+
+BOOST_FIXTURE_TEST_SUITE(TestSegmentPublisher, SegmentPublisherFixture)
+
+BOOST_AUTO_TEST_CASE(Basic)
+{
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+  expressInterest(Interest("/hello/world"));
+  BOOST_CHECK_EQUAL(numComplete, 1);
+  // First segment is answered directly in publish,
+  // Rest two are satisfied by the store
+  BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);
+
+  numRepliesFromStore = 0;
+  expressInterest(Interest("/hello/world"));
+  BOOST_CHECK_EQUAL(numComplete, 2);
+  BOOST_CHECK_EQUAL(numRepliesFromStore, 3);
+
+  advanceClocks(ndn::time::milliseconds(freshness));
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+
+  numRepliesFromStore = 0;
+  expressInterest(Interest("/hello/world"));
+  BOOST_CHECK_EQUAL(numComplete, 3);
+  BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+
+  numRepliesFromStore = 0;
+  face.expressInterest(Interest(Name("/hello/world/").appendSegment(0)),
+                       [this] (const Interest& interest, const Data& data) {
+                         numComplete++;
+                       },
+                       [] (const Interest& interest, const lp::Nack& nack) {
+                         BOOST_CHECK(false);
+                       },
+                       [] (const Interest& interest) {
+                         BOOST_CHECK(false);
+                       });
+  advanceClocks(ndn::time::milliseconds(10));
+  BOOST_CHECK_EQUAL(numComplete, 4);
+  BOOST_CHECK_EQUAL(numRepliesFromStore, 1);
+}
+
+BOOST_AUTO_TEST_CASE(LargerDataName)
+{
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+  dataName = Name("/hello/world/IBF");
+
+  expressInterest(Interest("/hello/world"));
+  BOOST_CHECK_EQUAL(numComplete, 1);
+  // First segment is answered directly in publish,
+  // Rest two are satisfied by the store
+  BOOST_CHECK_EQUAL(numRepliesFromStore, 2);
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 3);
+
+  advanceClocks(ndn::time::milliseconds(freshness));
+  BOOST_CHECK_EQUAL(publisher.m_ims.size(), 0);
+}
+
+BOOST_AUTO_TEST_SUITE_END()
+
+} // namespace psync
\ No newline at end of file
diff --git a/tests/test-state.cpp b/tests/test-state.cpp
index 47c1471..0a815fd 100644
--- a/tests/test-state.cpp
+++ b/tests/test-state.cpp
@@ -29,7 +29,7 @@
 
 BOOST_AUTO_TEST_SUITE(TestState)
 
-BOOST_AUTO_TEST_CASE(EncodeDeode)
+BOOST_AUTO_TEST_CASE(EncodeDecode)
 {
   State state;
   state.addContent(ndn::Name("test1"));
@@ -40,6 +40,30 @@
   State rcvdState(data.getContent());
 
   BOOST_CHECK(state.getContent() == rcvdState.getContent());
+
+  // Simulate getting buffer content from segment fetcher
+  ndn::Buffer buffer(data.getContent().value_size());
+  std::copy(data.getContent().value_begin(),
+            data.getContent().value_end(),
+            buffer.begin());
+
+  ndn::ConstBufferPtr buffer2 = make_shared<ndn::Buffer>(buffer);
+
+  ndn::Block block(std::move(buffer2));
+
+  State state2;
+  state2.wireDecode(block);
+
+  BOOST_CHECK(state.getContent() == state2.getContent());
+}
+
+BOOST_AUTO_TEST_CASE(EmptyContent)
+{
+  ndn::Data data;
+  BOOST_CHECK_NO_THROW(State state(data.getContent()));
+
+  State state(data.getContent());
+  BOOST_CHECK_EQUAL(state.getContent().size(), 0);
 }
 
 BOOST_AUTO_TEST_SUITE_END()