partial sync: segment hello and sync data

add segment publisher

refs: #4662

Change-Id: I62e7a2247bac58aeec364cd2a4e4d34259eae4af
diff --git a/src/consumer.cpp b/src/consumer.cpp
index f27f355..04f8b4a 100644
--- a/src/consumer.cpp
+++ b/src/consumer.cpp
@@ -21,6 +21,7 @@
 #include "detail/state.hpp"
 
 #include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
 
 #include <boost/algorithm/string.hpp>
 
@@ -41,6 +42,7 @@
  , m_syncPrefix(syncPrefix)
  , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append("hello"))
  , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append("sync"))
+ , m_syncDataContentType(ndn::tlv::ContentType_Blob)
  , m_onReceiveHelloData(onReceiveHelloData)
  , m_onUpdate(onUpdate)
  , m_bloomFilter(count, false_positive)
@@ -64,34 +66,70 @@
 }
 
 void
-Consumer::sendHelloInterest()
+Consumer::stop()
 {
-  ndn::Interest helloInterest(m_helloInterestPrefix);
-  helloInterest.setInterestLifetime(m_helloInterestLifetime);
-  helloInterest.setCanBePrefix(true);
-  helloInterest.setMustBeFresh(true);
+  if (m_syncFetcher) {
+    m_syncFetcher->stop();
+    m_syncFetcher.reset();
+  }
 
-  NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
-
-  m_face.expressInterest(helloInterest,
-                         std::bind(&Consumer::onHelloData, this, _1, _2),
-                         std::bind(&Consumer::onNackForHello, this, _1, _2),
-                         std::bind(&Consumer::onHelloTimeout, this, _1));
+  if (m_helloFetcher) {
+    m_helloFetcher->stop();
+    m_helloFetcher.reset();
+  }
 }
 
 void
-Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::sendHelloInterest()
 {
-  ndn::Name helloDataName = data.getName();
+  ndn::Interest helloInterest(m_helloInterestPrefix);
 
+  NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
+
+  if (m_helloFetcher) {
+    m_helloFetcher->stop();
+  }
+
+  ndn::util::SegmentFetcher::Options options;
+  options.interestLifetime = m_helloInterestLifetime;
+  options.maxTimeout = m_helloInterestLifetime;
+
+  m_helloFetcher = ndn::util::SegmentFetcher::start(m_face,
+                                                    helloInterest,
+                                                    ndn::security::v2::getAcceptAllValidator(),
+                                                    options);
+
+  m_helloFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+                                                  if (data.getFinalBlock()) {
+                                                    m_helloDataName = data.getName().getPrefix(-1);
+                                                  }
+                                                });
+
+  m_helloFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+                                       onHelloData(bufferPtr);
+                                     });
+
+  m_helloFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+                                    NDN_LOG_TRACE("Cannot fetch hello data, error: " <<
+                                                   errorCode << " message: " << msg);
+                                    ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+                                    NDN_LOG_TRACE("Scheduling after " << after);
+                                    m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
+                                  });
+}
+
+void
+Consumer::onHelloData(const ndn::ConstBufferPtr& bufferPtr)
+{
   NDN_LOG_DEBUG("On Hello Data");
 
   // Extract IBF from name which is the last element in hello data's name
-  m_iblt = helloDataName.getSubName(helloDataName.size()-1, 1);
+  m_iblt = m_helloDataName.getSubName(m_helloDataName.size()-1, 1);
 
   NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
 
-  State state(data.getContent());
+  State state(ndn::Block(std::move(bufferPtr)));
+
   std::vector<MissingDataInfo> updates;
   std::vector<ndn::Name> availableSubscriptions;
 
@@ -132,40 +170,64 @@
   syncInterestName.append(m_iblt);
 
   ndn::Interest syncInterest(syncInterestName);
-  syncInterest.setInterestLifetime(m_syncInterestLifetime);
-  syncInterest.setCanBePrefix(true);
-  syncInterest.setMustBeFresh(true);
 
   NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
                 " hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
 
-  // Remove last pending interest before sending a new one
-  if (m_outstandingInterestId != nullptr) {
-      m_face.removePendingInterest(m_outstandingInterestId);
-      m_outstandingInterestId = nullptr;
+  ndn::util::SegmentFetcher::Options options;
+  options.interestLifetime = m_syncInterestLifetime;
+  options.maxTimeout = m_syncInterestLifetime;;
+
+  if (m_syncFetcher) {
+    m_syncFetcher->stop();
   }
 
-  m_outstandingInterestId = m_face.expressInterest(syncInterest,
-                              std::bind(&Consumer::onSyncData, this, _1, _2),
-                              std::bind(&Consumer::onNackForSync, this, _1, _2),
-                              std::bind(&Consumer::onSyncTimeout, this, _1));
+  m_syncFetcher = ndn::util::SegmentFetcher::start(m_face,
+                                                   syncInterest,
+                                                   ndn::security::v2::getAcceptAllValidator(),
+                                                   options);
+
+  m_syncFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+                                                 if (data.getFinalBlock()) {
+                                                   m_syncDataName = data.getName().getPrefix(-1);
+                                                   m_syncDataContentType = data.getContentType();
+                                                 }
+
+                                                 if (m_syncDataContentType == ndn::tlv::ContentType_Nack)
+                                                 {
+                                                   NDN_LOG_DEBUG("Received application"
+                                                                  << " Nack from producer,"
+                                                                  << " sending hello again");
+                                                   sendHelloInterest();
+                                                 }
+                                               });
+
+  m_syncFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+                                      if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+                                        m_syncDataContentType = ndn::tlv::ContentType_Blob;
+                                        return;
+                                      }
+                                      NDN_LOG_TRACE("Segment fetcher got sync data");
+                                      onSyncData(bufferPtr);
+                                    });
+
+  m_syncFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+                                   NDN_LOG_TRACE("Cannot fetch sync data, error: "
+                                                 << errorCode << " message: " << msg);
+                                   ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+                                   NDN_LOG_TRACE("Scheduling after " << after);
+                                   m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+                                 });
 }
 
 void
-Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::onSyncData(const ndn::ConstBufferPtr& bufferPtr)
 {
-  ndn::Name syncDataName = data.getName();
-
   // Extract IBF from sync data name which is the last component
-  m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
+  m_iblt = m_syncDataName.getSubName(m_syncDataName.size()-1, 1);
 
-  if (data.getContentType() == ndn::tlv::ContentType_Nack) {
-    NDN_LOG_DEBUG("Received application Nack from producer, send hello again");
-    sendHelloInterest();
-    return;
-  }
+  State state(ndn::Block(std::move(bufferPtr)));
 
-  State state(data.getContent());
   std::vector <MissingDataInfo> updates;
 
   for (const auto& content : state.getContent()) {
@@ -190,40 +252,4 @@
   sendSyncInterest();
 }
 
-void
-Consumer::onHelloTimeout(const ndn::Interest& interest)
-{
-  NDN_LOG_DEBUG("on hello timeout");
-  this->sendHelloInterest();
-}
-
-void
-Consumer::onSyncTimeout(const ndn::Interest& interest)
-{
-  NDN_LOG_DEBUG("on sync timeout " << interest.getNonce());
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-void
-Consumer::onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
-  NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
-                " for interest " << interest << std::endl);
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
-}
-
-void
-Consumer::onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
-  NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
-                " for interest " << interest << std::endl);
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-} // namespace psync
+} // namespace psync
\ No newline at end of file