full-producer: segment sync data

refs: #4716

Change-Id: I6776bcfd2816c1c45548c3799c1ad3e52cb3cf18
diff --git a/src/detail/state.cpp b/src/detail/state.cpp
index d245c0b..09ee55e 100644
--- a/src/detail/state.cpp
+++ b/src/detail/state.cpp
@@ -72,9 +72,8 @@
 State::wireDecode(const ndn::Block& wire)
 {
   auto blockType = wire.type();
-
-  if (blockType != tlv::PSyncContent && blockType != ndn::tlv::Content) {
-    BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Content/PSyncContent Block, but Block is of a different type: #" +
+  if (blockType != tlv::PSyncContent) {
+    BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected PSyncContent Block, but Block is of type: #" +
                                            ndn::to_string(blockType)));
     return;
   }
@@ -83,34 +82,13 @@
 
   m_wire = wire;
 
-  auto it = m_wire.elements_begin();
-
-  if (it == m_wire.elements_end()) {
-    return;
-  }
-
-  if (blockType == tlv::PSyncContent) {
-    while(it != m_wire.elements_end()) {
-      if (it->type() == ndn::tlv::Name) {
-        m_content.emplace_back(*it);
-      }
-      else {
-        BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
-                                              ndn::to_string(it->type())));
-      }
-      ++it;
+  for (auto it = m_wire.elements_begin(); it != m_wire.elements_end(); ++it) {
+    if (it->type() == ndn::tlv::Name) {
+      m_content.emplace_back(*it);
     }
-  }
-  else if (blockType == ndn::tlv::Content) {
-    it->parse();
-    for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
-      if (val->type() == ndn::tlv::Name) {
-        m_content.emplace_back(*val);
-      }
-      else {
-        BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
-                                              ndn::to_string(val->type())));
-      }
+    else {
+      BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of type: #" +
+                                            ndn::to_string(it->type())));
     }
   }
 }
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
index c00bd0a..0953f93 100644
--- a/src/full-producer.cpp
+++ b/src/full-producer.cpp
@@ -41,7 +41,6 @@
   : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
   , m_syncInterestLifetime(syncInterestLifetime)
   , m_onUpdate(onUpdateCallBack)
-  , m_outstandingInterestId(nullptr)
   , m_scheduledSyncInterestId(m_scheduler)
 {
   int jitter = m_syncInterestLifetime.count() * .20;
@@ -49,7 +48,7 @@
 
   m_registerPrefixId =
     m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
-                             std::bind(&FullProducer::onInterest, this, _1, _2),
+                             std::bind(&FullProducer::onSyncInterest, this, _1, _2),
                              std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
 
   // Should we do this after setInterestFilter success call back
@@ -59,8 +58,8 @@
 
 FullProducer::~FullProducer()
 {
-  if (m_outstandingInterestId != nullptr) {
-    m_face.removePendingInterest(m_outstandingInterestId);
+  if (m_fetcher) {
+    m_fetcher->stop();
   }
 
   m_face.unsetInterestFilter(m_registerPrefixId);
@@ -89,8 +88,8 @@
   // If we send two sync interest one after the other
   // since there is no new data in the network yet,
   // when data is available it may satisfy both of them
-  if (m_outstandingInterestId != nullptr) {
-    m_face.removePendingInterest(m_outstandingInterestId);
+  if (m_fetcher) {
+    m_fetcher->stop();
   }
 
   // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
@@ -106,50 +105,53 @@
                               [this] { sendSyncInterest(); });
 
   ndn::Interest syncInterest(syncInterestName);
-  syncInterest.setInterestLifetime(m_syncInterestLifetime);
-  // Other side appends hash of IBF to sync data name
-  syncInterest.setCanBePrefix(true);
-  syncInterest.setMustBeFresh(true);
 
-  syncInterest.setNonce(1);
-  syncInterest.refreshNonce();
+  ndn::util::SegmentFetcher::Options options;
+  options.interestLifetime = m_syncInterestLifetime;
+  options.maxTimeout = m_syncInterestLifetime;
 
-  m_outstandingInterestId = m_face.expressInterest(syncInterest,
-                              std::bind(&FullProducer::onSyncData, this, _1, _2),
-                              [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
-                                NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
-                                              " for Interest with Nonce: " << interest.getNonce());
-                              },
-                              [] (const ndn::Interest& interest) {
-                                NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
-                              });
+  m_fetcher = ndn::util::SegmentFetcher::start(m_face,
+                                               syncInterest,
+                                               ndn::security::v2::getAcceptAllValidator(),
+                                               options);
+
+  m_fetcher->onComplete.connect([this, syncInterest] (ndn::ConstBufferPtr bufferPtr) {
+                                  onSyncData(syncInterest, bufferPtr);
+                                });
+
+  m_fetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+                               NDN_LOG_ERROR("Cannot fetch sync data, error: " <<
+                                              errorCode << " message: " << msg);
+                             });
 
   NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
                 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
 }
 
 void
-FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
+FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
 {
-  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
-  if (nameWithoutSyncPrefix.size() == 2 &&
-      nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
-      onRecoveryInterest(interest);
+  if (m_segmentPublisher.replyFromStore(interest.getName())) {
+    return;
   }
-  // interest for recovery segment
-  else if (nameWithoutSyncPrefix.size() == 3 &&
-           nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 2) == RECOVERY_PREFIX.get(0)) {
-    onRecoveryInterest(interest);
-  }
-  else if (nameWithoutSyncPrefix.size() == 1) {
-    onSyncInterest(interest);
-  }
-}
 
-void
-FullProducer::onSyncInterest(const ndn::Interest& interest)
-{
-  ndn::Name interestName = interest.getName();
+  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
+  ndn::Name interestName;
+  uint64_t interestSeq = 0;
+
+  if (nameWithoutSyncPrefix.size() == 1) {
+    // Get /IBF from /IBF
+    interestName = interest.getName();
+  }
+  else if (nameWithoutSyncPrefix.size() == 2) {
+    // Get /IBF from /IBF/<seq-no>
+    interestName = interest.getName().getPrefix(-1);
+    interestSeq = interest.getName().get(-1).toSegment();
+  }
+  else {
+    return;
+  }
+
   ndn::name::Component ibltName = interestName.get(interestName.size()-1);
 
   NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
@@ -174,23 +176,22 @@
                    << " negative: " << negative.size() << " m_threshold: "
                    << m_threshold);
 
-    // Send nack if greater then threshold, else send positive below as usual
+    // Send all data if greater then threshold, else send positive below as usual
     // Or send if we can't get neither positive nor negative differences
     if (positive.size() + negative.size() >= m_threshold ||
         (positive.size() == 0 && negative.size() == 0)) {
-
-      // If we don't have anything to offer means that
-      // we are behind and should not mislead other nodes.
-      bool haveSomethingToOffer = false;
+      State state;
       for (const auto& content : m_prefixes) {
         if (content.second != 0) {
-          haveSomethingToOffer = true;
+          state.addContent(ndn::Name(content.first).appendNumber(content.second));
         }
       }
 
-      if (haveSomethingToOffer) {
-        sendApplicationNack(interestName);
+      if (!state.getContent().empty()) {
+        m_segmentPublisher.publish(interest.getName(), interest.getName(),
+                                   state.wireEncode(), m_syncReplyFreshness);
       }
+
       return;
     }
   }
@@ -206,6 +207,9 @@
 
   if (!state.getContent().empty()) {
     NDN_LOG_DEBUG("Sending sync content: " << state);
+    if (interestSeq != 0) {
+      interestName.appendSegment(interestSeq);
+    }
     sendSyncData(interestName, state.wireEncode());
     return;
   }
@@ -223,26 +227,6 @@
 }
 
 void
-FullProducer::onRecoveryInterest(const ndn::Interest& interest)
-{
-  NDN_LOG_DEBUG("Recovery interest received");
-
-  if (m_segmentPublisher.replyFromStore(interest.getName())) {
-    return;
-  }
-
-  State state;
-  for (const auto& content : m_prefixes) {
-    if (content.second != 0) {
-      state.addContent(ndn::Name(content.first).appendNumber(content.second));
-    }
-  }
-
-  m_segmentPublisher.publish(interest.getName(), interest.getName(),
-                             state.wireEncode(), m_syncReplyFreshness);
-}
-
-void
 FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
 {
   NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
@@ -251,57 +235,40 @@
   m_iblt.appendToName(nameWithIblt);
 
   // Append hash of our IBF so that data name maybe different for each node answering
-  ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
-  data.setFreshnessPeriod(m_syncReplyFreshness);
-  data.setContent(block);
-  m_keyChain.sign(data);
+  ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
 
   // checking if our own interest got satisfied
   if (m_outstandingInterestName == name) {
     NDN_LOG_DEBUG("Satisfied our own pending interest");
     // remove outstanding interest
-    if (m_outstandingInterestId != nullptr) {
-      NDN_LOG_DEBUG("Removing our pending interest from face");
-      m_face.removePendingInterest(m_outstandingInterestId);
-      m_outstandingInterestId = nullptr;
+    if (m_fetcher) {
+      NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
+      m_fetcher->stop();
       m_outstandingInterestName = ndn::Name("");
     }
 
     NDN_LOG_DEBUG("Sending Sync Data");
 
     // Send data after removing pending sync interest on face
-    m_face.put(data);
+    m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
 
     NDN_LOG_TRACE("Renewing sync interest");
     sendSyncInterest();
   }
   else {
     NDN_LOG_DEBUG("Sending Sync Data");
-    m_face.put(data);
+    m_segmentPublisher.publish(name, dataName, block, m_syncReplyFreshness);
   }
 }
 
 void
-FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
 {
-  ndn::Name interestName = interest.getName();
   deletePendingInterests(interest.getName());
 
-  if (data.getContentType() == ndn::tlv::ContentType_Nack) {
-    NDN_LOG_DEBUG("Got application nack, sending recovery interest");
-    sendRecoveryInterest(interest);
-    return;
-  }
-
-  State state(data.getContent());
+  State state(ndn::Block(std::move(bufferPtr)));
   std::vector<MissingDataInfo> updates;
 
-  if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
-      state.getContent().empty()) {
-    NDN_LOG_TRACE("Recovery data is empty, other side is behind");
-    return;
-  }
-
   NDN_LOG_DEBUG("Sync Data Received:  " << state);
 
   for (const auto& content : state.getContent()) {
@@ -325,8 +292,8 @@
     sendSyncInterest();
   }
   else {
-    NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce()  <<
-                  " , hash: " << std::hash<ndn::Name>{}(interestName));
+    NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
+                  " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
   }
 }
 
@@ -398,39 +365,4 @@
   }
 }
 
-void
-FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
-{
-  if (m_outstandingInterestId != nullptr) {
-    m_face.removePendingInterest(m_outstandingInterestId);
-    m_outstandingInterestId = nullptr;
-  }
-
-  ndn::Name ibltName;
-  m_iblt.appendToName(ibltName);
-
-  ndn::Name recoveryInterestName(m_syncPrefix);
-  recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
-  recoveryInterestName.append(RECOVERY_PREFIX);
-
-  ndn::Interest recoveryInterest(recoveryInterestName);
-  recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
-
-  auto fetcher = ndn::util::SegmentFetcher::start(m_face,
-                                                  recoveryInterest,
-                                                  ndn::security::v2::getAcceptAllValidator());
-
-  fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
-                                NDN_LOG_TRACE("Segment fetcher got data");
-                                ndn::Data data;
-                                data.setContent(std::move(bufferPtr));
-                                onSyncData(recoveryInterest, data);
-                              });
-
-  fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
-                             NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
-                                           " message: " << msg);
-                           });
-}
-
 } // namespace psync
\ No newline at end of file
diff --git a/src/full-producer.hpp b/src/full-producer.hpp
index bcf699e..21756d0 100644
--- a/src/full-producer.hpp
+++ b/src/full-producer.hpp
@@ -32,6 +32,7 @@
 #include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
 #include <ndn-cxx/util/time.hpp>
 #include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
 
 namespace psync {
 
@@ -47,7 +48,6 @@
 typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
 
 const ndn::time::milliseconds SYNC_INTEREST_LIFTIME = 1_s;
-const ndn::Name RECOVERY_PREFIX("recovery");
 
 /**
  * @brief Full sync logic to synchronize with other nodes
@@ -111,19 +111,6 @@
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
   /**
-   * @brief Process interest from other parties
-   *
-   * Determine whether this is a sync interest or recovery interest
-   * and dispatch to onSyncInterest or onRecoveryInterest respectively.
-   *
-   * @param prefixName prefix for sync group which we registered
-   * @param interest the interest we got
-   */
-  void
-  onInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
-
-private:
-  /**
    * @brief Process sync interest from other parties
    *
    * Get differences b/w our IBF and IBF in the sync interest.
@@ -134,19 +121,13 @@
    * Otherwise add the sync interest into a map with interest name as key and PendingEntryInfoFull
    * as value.
    *
-   * @param interest the sync interest we got
+   * @param prefixName prefix for sync group which we registered
+   * @param interest the interest we got
    */
   void
-  onSyncInterest(const ndn::Interest& interest);
+  onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest);
 
-   /**
-   * @brief Publish our entire state so that requester can catch up.
-   *
-   * @param interest the recovery interest we got
-   */
-  void
-  onRecoveryInterest(const ndn::Interest& interest);
-
+private:
   /**
    * @brief Send sync data
    *
@@ -173,10 +154,10 @@
    * sendSyncInterest because the last one was satisfied by the incoming data
    *
    * @param interest interest for which we got the data
-   * @param data the data packet we got
+   * @param bufferPtr sync data content
    */
   void
-  onSyncData(const ndn::Interest& interest, const ndn::Data& data);
+  onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr);
 
   /**
    * @brief Satisfy pending sync interests
@@ -204,15 +185,6 @@
   bool
   isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
 
-  /**
-   * @brief Send recovery interest using segment fetcher
-   *
-   * Recovery data is expected go over max packet size
-   * Appends the RECOVERY_PREFIX to the given interest
-   */
-  void
-  sendRecoveryInterest(const ndn::Interest& interest);
-
 private:
   std::map <ndn::Name, PendingEntryInfoFull> m_pendingEntries;
 
@@ -220,8 +192,6 @@
 
   UpdateCallback m_onUpdate;
 
-  const ndn::PendingInterestId* m_outstandingInterestId;
-
   ndn::util::scheduler::ScopedEventId m_scheduledSyncInterestId;
 
   std::uniform_int_distribution<> m_jitter;
@@ -229,6 +199,8 @@
   ndn::Name m_outstandingInterestName;
 
   const ndn::RegisteredPrefixId* m_registerPrefixId;
+
+  std::shared_ptr<ndn::util::SegmentFetcher> m_fetcher;
 };
 
 } // namespace psync
diff --git a/tests/test-full-producer.cpp b/tests/test-full-producer.cpp
index ee6764e..ce23cf8 100644
--- a/tests/test-full-producer.cpp
+++ b/tests/test-full-producer.cpp
@@ -47,7 +47,7 @@
   Name syncInterestName(syncPrefix);
   syncInterestName.append("malicious-IBF");
 
-  BOOST_REQUIRE_NO_THROW(node.onInterest(syncPrefix, Interest(syncInterestName)));
+  BOOST_REQUIRE_NO_THROW(node.onSyncInterest(syncPrefix, Interest(syncInterestName)));
 }
 
 BOOST_AUTO_TEST_SUITE_END()
diff --git a/tests/test-state.cpp b/tests/test-state.cpp
index 0a815fd..956bbbd 100644
--- a/tests/test-state.cpp
+++ b/tests/test-state.cpp
@@ -35,35 +35,43 @@
   state.addContent(ndn::Name("test1"));
   state.addContent(ndn::Name("test2"));
 
-  ndn::Data data;
-  data.setContent(state.wireEncode());
-  State rcvdState(data.getContent());
-
-  BOOST_CHECK(state.getContent() == rcvdState.getContent());
-
   // Simulate getting buffer content from segment fetcher
+  Data data;
+  data.setContent(state.wireEncode());
   ndn::Buffer buffer(data.getContent().value_size());
   std::copy(data.getContent().value_begin(),
             data.getContent().value_end(),
             buffer.begin());
 
-  ndn::ConstBufferPtr buffer2 = make_shared<ndn::Buffer>(buffer);
+  ndn::ConstBufferPtr bufferPtr = make_shared<ndn::Buffer>(buffer);
 
-  ndn::Block block(std::move(buffer2));
+  ndn::Block block(std::move(bufferPtr));
 
-  State state2;
-  state2.wireDecode(block);
+  State rcvdState;
+  rcvdState.wireDecode(block);
 
-  BOOST_CHECK(state.getContent() == state2.getContent());
+  BOOST_CHECK(state.getContent() == rcvdState.getContent());
 }
 
 BOOST_AUTO_TEST_CASE(EmptyContent)
 {
-  ndn::Data data;
-  BOOST_CHECK_NO_THROW(State state(data.getContent()));
+  State state;
 
-  State state(data.getContent());
-  BOOST_CHECK_EQUAL(state.getContent().size(), 0);
+  // Simulate getting buffer content from segment fetcher
+  Data data;
+  data.setContent(state.wireEncode());
+  ndn::Buffer buffer(data.getContent().value_size());
+  std::copy(data.getContent().value_begin(),
+            data.getContent().value_end(),
+            buffer.begin());
+  ndn::ConstBufferPtr bufferPtr = make_shared<ndn::Buffer>(buffer);
+
+  ndn::Block block(std::move(bufferPtr));
+
+  BOOST_CHECK_NO_THROW(State state2(block));
+
+  State state2(block);
+  BOOST_CHECK_EQUAL(state2.getContent().size(), 0);
 }
 
 BOOST_AUTO_TEST_SUITE_END()