partial sync: segment hello and sync data

add segment publisher

refs: #4662

Change-Id: I62e7a2247bac58aeec364cd2a4e4d34259eae4af
diff --git a/src/consumer.cpp b/src/consumer.cpp
index f27f355..04f8b4a 100644
--- a/src/consumer.cpp
+++ b/src/consumer.cpp
@@ -21,6 +21,7 @@
 #include "detail/state.hpp"
 
 #include <ndn-cxx/util/logger.hpp>
+#include <ndn-cxx/security/validator-null.hpp>
 
 #include <boost/algorithm/string.hpp>
 
@@ -41,6 +42,7 @@
  , m_syncPrefix(syncPrefix)
  , m_helloInterestPrefix(ndn::Name(m_syncPrefix).append("hello"))
  , m_syncInterestPrefix(ndn::Name(m_syncPrefix).append("sync"))
+ , m_syncDataContentType(ndn::tlv::ContentType_Blob)
  , m_onReceiveHelloData(onReceiveHelloData)
  , m_onUpdate(onUpdate)
  , m_bloomFilter(count, false_positive)
@@ -64,34 +66,70 @@
 }
 
 void
-Consumer::sendHelloInterest()
+Consumer::stop()
 {
-  ndn::Interest helloInterest(m_helloInterestPrefix);
-  helloInterest.setInterestLifetime(m_helloInterestLifetime);
-  helloInterest.setCanBePrefix(true);
-  helloInterest.setMustBeFresh(true);
+  if (m_syncFetcher) {
+    m_syncFetcher->stop();
+    m_syncFetcher.reset();
+  }
 
-  NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
-
-  m_face.expressInterest(helloInterest,
-                         std::bind(&Consumer::onHelloData, this, _1, _2),
-                         std::bind(&Consumer::onNackForHello, this, _1, _2),
-                         std::bind(&Consumer::onHelloTimeout, this, _1));
+  if (m_helloFetcher) {
+    m_helloFetcher->stop();
+    m_helloFetcher.reset();
+  }
 }
 
 void
-Consumer::onHelloData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::sendHelloInterest()
 {
-  ndn::Name helloDataName = data.getName();
+  ndn::Interest helloInterest(m_helloInterestPrefix);
 
+  NDN_LOG_DEBUG("Send Hello Interest " << helloInterest);
+
+  if (m_helloFetcher) {
+    m_helloFetcher->stop();
+  }
+
+  ndn::util::SegmentFetcher::Options options;
+  options.interestLifetime = m_helloInterestLifetime;
+  options.maxTimeout = m_helloInterestLifetime;
+
+  m_helloFetcher = ndn::util::SegmentFetcher::start(m_face,
+                                                    helloInterest,
+                                                    ndn::security::v2::getAcceptAllValidator(),
+                                                    options);
+
+  m_helloFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+                                                  if (data.getFinalBlock()) {
+                                                    m_helloDataName = data.getName().getPrefix(-1);
+                                                  }
+                                                });
+
+  m_helloFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+                                       onHelloData(bufferPtr);
+                                     });
+
+  m_helloFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+                                    NDN_LOG_TRACE("Cannot fetch hello data, error: " <<
+                                                   errorCode << " message: " << msg);
+                                    ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+                                    NDN_LOG_TRACE("Scheduling after " << after);
+                                    m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
+                                  });
+}
+
+void
+Consumer::onHelloData(const ndn::ConstBufferPtr& bufferPtr)
+{
   NDN_LOG_DEBUG("On Hello Data");
 
   // Extract IBF from name which is the last element in hello data's name
-  m_iblt = helloDataName.getSubName(helloDataName.size()-1, 1);
+  m_iblt = m_helloDataName.getSubName(m_helloDataName.size()-1, 1);
 
   NDN_LOG_TRACE("m_iblt: " << std::hash<std::string>{}(m_iblt.toUri()));
 
-  State state(data.getContent());
+  State state(ndn::Block(std::move(bufferPtr)));
+
   std::vector<MissingDataInfo> updates;
   std::vector<ndn::Name> availableSubscriptions;
 
@@ -132,40 +170,64 @@
   syncInterestName.append(m_iblt);
 
   ndn::Interest syncInterest(syncInterestName);
-  syncInterest.setInterestLifetime(m_syncInterestLifetime);
-  syncInterest.setCanBePrefix(true);
-  syncInterest.setMustBeFresh(true);
 
   NDN_LOG_DEBUG("sendSyncInterest, nonce: " << syncInterest.getNonce() <<
                 " hash: " << std::hash<std::string>{}(syncInterest.getName().toUri()));
 
-  // Remove last pending interest before sending a new one
-  if (m_outstandingInterestId != nullptr) {
-      m_face.removePendingInterest(m_outstandingInterestId);
-      m_outstandingInterestId = nullptr;
+  ndn::util::SegmentFetcher::Options options;
+  options.interestLifetime = m_syncInterestLifetime;
+  options.maxTimeout = m_syncInterestLifetime;;
+
+  if (m_syncFetcher) {
+    m_syncFetcher->stop();
   }
 
-  m_outstandingInterestId = m_face.expressInterest(syncInterest,
-                              std::bind(&Consumer::onSyncData, this, _1, _2),
-                              std::bind(&Consumer::onNackForSync, this, _1, _2),
-                              std::bind(&Consumer::onSyncTimeout, this, _1));
+  m_syncFetcher = ndn::util::SegmentFetcher::start(m_face,
+                                                   syncInterest,
+                                                   ndn::security::v2::getAcceptAllValidator(),
+                                                   options);
+
+  m_syncFetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
+                                                 if (data.getFinalBlock()) {
+                                                   m_syncDataName = data.getName().getPrefix(-1);
+                                                   m_syncDataContentType = data.getContentType();
+                                                 }
+
+                                                 if (m_syncDataContentType == ndn::tlv::ContentType_Nack)
+                                                 {
+                                                   NDN_LOG_DEBUG("Received application"
+                                                                  << " Nack from producer,"
+                                                                  << " sending hello again");
+                                                   sendHelloInterest();
+                                                 }
+                                               });
+
+  m_syncFetcher->onComplete.connect([this] (ndn::ConstBufferPtr bufferPtr) {
+                                      if (m_syncDataContentType == ndn::tlv::ContentType_Nack) {
+                                        m_syncDataContentType = ndn::tlv::ContentType_Blob;
+                                        return;
+                                      }
+                                      NDN_LOG_TRACE("Segment fetcher got sync data");
+                                      onSyncData(bufferPtr);
+                                    });
+
+  m_syncFetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
+                                   NDN_LOG_TRACE("Cannot fetch sync data, error: "
+                                                 << errorCode << " message: " << msg);
+                                   ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
+                                   NDN_LOG_TRACE("Scheduling after " << after);
+                                   m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
+                                 });
 }
 
 void
-Consumer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
+Consumer::onSyncData(const ndn::ConstBufferPtr& bufferPtr)
 {
-  ndn::Name syncDataName = data.getName();
-
   // Extract IBF from sync data name which is the last component
-  m_iblt = syncDataName.getSubName(syncDataName.size()-1, 1);
+  m_iblt = m_syncDataName.getSubName(m_syncDataName.size()-1, 1);
 
-  if (data.getContentType() == ndn::tlv::ContentType_Nack) {
-    NDN_LOG_DEBUG("Received application Nack from producer, send hello again");
-    sendHelloInterest();
-    return;
-  }
+  State state(ndn::Block(std::move(bufferPtr)));
 
-  State state(data.getContent());
   std::vector <MissingDataInfo> updates;
 
   for (const auto& content : state.getContent()) {
@@ -190,40 +252,4 @@
   sendSyncInterest();
 }
 
-void
-Consumer::onHelloTimeout(const ndn::Interest& interest)
-{
-  NDN_LOG_DEBUG("on hello timeout");
-  this->sendHelloInterest();
-}
-
-void
-Consumer::onSyncTimeout(const ndn::Interest& interest)
-{
-  NDN_LOG_DEBUG("on sync timeout " << interest.getNonce());
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-void
-Consumer::onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
-  NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
-                " for interest " << interest << std::endl);
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendHelloInterest(); });
-}
-
-void
-Consumer::onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack)
-{
-  NDN_LOG_DEBUG("received Nack with reason " << nack.getReason() <<
-                " for interest " << interest << std::endl);
-
-  ndn::time::milliseconds after(m_rangeUniformRandom(m_rng));
-  m_scheduler.scheduleEvent(after, [this] { sendSyncInterest(); });
-}
-
-} // namespace psync
+} // namespace psync
\ No newline at end of file
diff --git a/src/consumer.hpp b/src/consumer.hpp
index 9b30d8d..6460bc5 100644
--- a/src/consumer.hpp
+++ b/src/consumer.hpp
@@ -26,6 +26,7 @@
 
 #include <ndn-cxx/face.hpp>
 #include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/segment-fetcher.hpp>
 #include <ndn-cxx/util/time.hpp>
 
 #include <random>
@@ -130,6 +131,12 @@
     return it->second;
   }
 
+  /**
+   * @brief Stop segment fetcher to stop the sync and free resources
+   */
+  void
+  stop();
+
 private:
   /**
    * @brief Get hello data from the producer
@@ -141,11 +148,10 @@
    * m_onReceiveHelloData is called to let the application know
    * so that it can set the subscription list using addSubscription
    *
-   * @param interest hello interest
-   * @param data hello data
+   * @param bufferPtr hello data content
    */
   void
-  onHelloData(const ndn::Interest& interest, const ndn::Data& data);
+  onHelloData(const ndn::ConstBufferPtr& bufferPtr);
 
   /**
    * @brief Get hello data from the producer
@@ -155,23 +161,10 @@
    * have the latest update for. We update our copy of producer's IBF with the latest one.
    * Then we send another sync interest after a random jitter.
    *
-   * @param interest sync interest
-   * @param data sync data
+   * @param bufferPtr sync data content
    */
   void
-  onSyncData(const ndn::Interest& interest, const ndn::Data& data);
-
-  void
-  onHelloTimeout(const ndn::Interest& interest);
-
-  void
-  onSyncTimeout(const ndn::Interest& interest);
-
-  void
-  onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack);
-
-  void
-  onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack);
+  onSyncData(const ndn::ConstBufferPtr& bufferPtr);
 
 PUBLIC_WITH_TESTS_ELSE_PRIVATE:
   ndn::Face& m_face;
@@ -181,6 +174,9 @@
   ndn::Name m_helloInterestPrefix;
   ndn::Name m_syncInterestPrefix;
   ndn::Name m_iblt;
+  ndn::Name m_helloDataName;
+  ndn::Name m_syncDataName;
+  uint32_t m_syncDataContentType;
 
   ReceiveHelloCallback m_onReceiveHelloData;
 
@@ -197,10 +193,10 @@
   std::map<ndn::Name, uint64_t> m_prefixes;
   std::set<ndn::Name> m_subscriptionList;
 
-  const ndn::PendingInterestId* m_outstandingInterestId;
-
   std::mt19937 m_rng;
   std::uniform_int_distribution<> m_rangeUniformRandom;
+  std::shared_ptr<ndn::util::SegmentFetcher> m_helloFetcher;
+  std::shared_ptr<ndn::util::SegmentFetcher> m_syncFetcher;
 };
 
 } // namespace psync
diff --git a/src/detail/state.cpp b/src/detail/state.cpp
index e5c6c47..d245c0b 100644
--- a/src/detail/state.cpp
+++ b/src/detail/state.cpp
@@ -71,29 +71,46 @@
 void
 State::wireDecode(const ndn::Block& wire)
 {
-  if (!wire.hasWire()) {
-    BOOST_THROW_EXCEPTION(ndn::tlv::Error("The supplied block does not contain wire format"));
+  auto blockType = wire.type();
+
+  if (blockType != tlv::PSyncContent && blockType != ndn::tlv::Content) {
+    BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Content/PSyncContent Block, but Block is of a different type: #" +
+                                           ndn::to_string(blockType)));
+    return;
   }
 
   wire.parse();
+
   m_wire = wire;
 
   auto it = m_wire.elements_begin();
 
-  if (it->type() != tlv::PSyncContent) {
-    BOOST_THROW_EXCEPTION(ndn::tlv::Error("Unexpected TLV type when decoding Content: " +
-                                          ndn::to_string(wire.type())));
+  if (it == m_wire.elements_end()) {
+    return;
   }
 
-  it->parse();
-
-  for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
-    if (val->type() == ndn::tlv::Name) {
-      m_content.emplace_back(*val);
+  if (blockType == tlv::PSyncContent) {
+    while(it != m_wire.elements_end()) {
+      if (it->type() == ndn::tlv::Name) {
+        m_content.emplace_back(*it);
+      }
+      else {
+        BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
+                                              ndn::to_string(it->type())));
+      }
+      ++it;
     }
-    else {
-      BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
-                                            ndn::to_string(m_wire.type())));
+  }
+  else if (blockType == ndn::tlv::Content) {
+    it->parse();
+    for (auto val = it->elements_begin(); val != it->elements_end(); ++val) {
+      if (val->type() == ndn::tlv::Name) {
+        m_content.emplace_back(*val);
+      }
+      else {
+        BOOST_THROW_EXCEPTION(ndn::tlv::Error("Expected Name Block, but Block is of a different type: #" +
+                                              ndn::to_string(val->type())));
+      }
     }
   }
 }
diff --git a/src/full-producer.cpp b/src/full-producer.cpp
index dd4d452..c00bd0a 100644
--- a/src/full-producer.cpp
+++ b/src/full-producer.cpp
@@ -23,7 +23,6 @@
 #include <ndn-cxx/util/segment-fetcher.hpp>
 #include <ndn-cxx/security/validator-null.hpp>
 
-#include <iostream>
 #include <cstring>
 #include <limits>
 #include <functional>
@@ -137,6 +136,11 @@
       nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
       onRecoveryInterest(interest);
   }
+  // interest for recovery segment
+  else if (nameWithoutSyncPrefix.size() == 3 &&
+           nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 2) == RECOVERY_PREFIX.get(0)) {
+    onRecoveryInterest(interest);
+  }
   else if (nameWithoutSyncPrefix.size() == 1) {
     onSyncInterest(interest);
   }
@@ -223,6 +227,10 @@
 {
   NDN_LOG_DEBUG("Recovery interest received");
 
+  if (m_segmentPublisher.replyFromStore(interest.getName())) {
+    return;
+  }
+
   State state;
   for (const auto& content : m_prefixes) {
     if (content.second != 0) {
@@ -230,8 +238,8 @@
     }
   }
 
-  // Send even if state is empty to let other side know that we are behind
-  sendRecoveryData(interest.getName(), state);
+  m_segmentPublisher.publish(interest.getName(), interest.getName(),
+                             state.wireEncode(), m_syncReplyFreshness);
 }
 
 void
@@ -391,44 +399,6 @@
 }
 
 void
-FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
-{
-  ndn::EncodingBuffer buffer;
-  buffer.prependBlock(state.wireEncode());
-
-  const uint8_t* rawBuffer = buffer.buf();
-  const uint8_t* segmentBegin = rawBuffer;
-  const uint8_t* end = rawBuffer + buffer.size();
-
-  uint64_t segmentNo = 0;
-  do {
-    const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
-    if (segmentEnd > end) {
-      segmentEnd = end;
-    }
-
-    ndn::Name segmentName(prefix);
-    segmentName.appendSegment(segmentNo);
-
-    std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
-    data->setContent(segmentBegin, segmentEnd - segmentBegin);
-    data->setFreshnessPeriod(m_syncReplyFreshness);
-
-    segmentBegin = segmentEnd;
-    if (segmentBegin >= end) {
-      data->setFinalBlock(segmentName[-1]);
-    }
-
-    m_keyChain.sign(*data);
-    m_face.put(*data);
-
-    NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
-
-    ++segmentNo;
-  } while (segmentBegin < end);
-}
-
-void
 FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
 {
   if (m_outstandingInterestId != nullptr) {
diff --git a/src/full-producer.hpp b/src/full-producer.hpp
index 2f97522..bcf699e 100644
--- a/src/full-producer.hpp
+++ b/src/full-producer.hpp
@@ -205,13 +205,6 @@
   isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative);
 
   /**
-   * @brief Segment and send state with the given data name
-   *
-   */
-  void
-  sendRecoveryData(const ndn::Name& prefix, const State& state);
-
-  /**
    * @brief Send recovery interest using segment fetcher
    *
    * Recovery data is expected go over max packet size
diff --git a/src/partial-producer.cpp b/src/partial-producer.cpp
index d8d98a5..b0d29f1 100644
--- a/src/partial-producer.cpp
+++ b/src/partial-producer.cpp
@@ -22,7 +22,6 @@
 
 #include <ndn-cxx/util/logger.hpp>
 
-#include <iostream>
 #include <cstring>
 #include <limits>
 
@@ -75,7 +74,18 @@
 void
 PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
 {
-  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest.getNonce());
+  // Last component or third last component (in case of interest with IBF and segment)
+  // needs to be hello
+  if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
+      interest.getName().get(interest.getName().size()-3).toUri() != "hello") {
+    return;
+  }
+
+  if (m_segmentPublisher.replyFromStore(interest.getName())) {
+    return;
+  }
+
+  NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
 
   State state;
 
@@ -87,13 +97,8 @@
   ndn::Name helloDataName = prefix;
   m_iblt.appendToName(helloDataName);
 
-  ndn::Data data;
-  data.setName(helloDataName);
-  data.setFreshnessPeriod(m_helloReplyFreshness);
-  data.setContent(state.wireEncode());
-
-  m_keyChain.sign(data);
-  m_face.put(data);
+  m_segmentPublisher.publish(interest.getName(), helloDataName,
+                             state.wireEncode(), m_helloReplyFreshness);
 }
 
 void
@@ -104,6 +109,15 @@
 
   ndn::Name interestName = interest.getName();
 
+  if (interestName.get(interestName.size() - 5).toUri() != "sync" &&
+      interestName.get(interestName.size() - 7).toUri() != "sync") {
+    return;
+  }
+
+  if (m_segmentPublisher.replyFromStore(interest.getName())) {
+    return;
+  }
+
   ndn::name::Component bfName, ibltName;
   unsigned int projectedCount;
   double falsePositiveProb;
@@ -171,15 +185,9 @@
     // send back data
     ndn::Name syncDataName = interestName;
     m_iblt.appendToName(syncDataName);
-    ndn::Data data;
-    data.setName(syncDataName);
-    data.setFreshnessPeriod(m_syncReplyFreshness);
-    data.setContent(state.wireEncode());
 
-    m_keyChain.sign(data);
-    NDN_LOG_DEBUG("Sending sync data");
-    m_face.put(data);
-
+    m_segmentPublisher.publish(interest.getName(), syncDataName,
+                               state.wireEncode(), m_syncReplyFreshness);
     return;
   }
 
@@ -222,8 +230,8 @@
     State state;
     if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
       if (entry.bf.contains(prefix.toUri())) {
-         state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
-         NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
+        state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
+        NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
       }
       else {
         NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
@@ -232,14 +240,9 @@
       // generate sync data and cancel the event
       ndn::Name syncDataName = it->first;
       m_iblt.appendToName(syncDataName);
-      ndn::Data data;
-      data.setName(syncDataName);
-      data.setFreshnessPeriod(m_syncReplyFreshness);
-      data.setContent(state.wireEncode());
 
-      m_keyChain.sign(data);
-      NDN_LOG_DEBUG("Sending sync data");
-      m_face.put(data);
+      m_segmentPublisher.publish(it->first, syncDataName,
+                                 state.wireEncode(), m_syncReplyFreshness);
 
       m_pendingEntries.erase(it++);
     }
diff --git a/src/partial-producer.hpp b/src/partial-producer.hpp
index a2db7ac..cb014fc 100644
--- a/src/partial-producer.hpp
+++ b/src/partial-producer.hpp
@@ -112,7 +112,8 @@
    * Either respond with sync data if consumer is behind or
    * store sync interest in m_pendingEntries
    *
-   * Sync data's name format is: /\<sync-prefix\>/sync/\<old-IBF\>/\<current-IBF\>
+   * Sync data's name format is: /\<syncPrefix\>/sync/\<BF\>/\<old-IBF\>/\<current-IBF\>
+   * (BF has 3 components).
    */
   void
   onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest);
diff --git a/src/producer-base.cpp b/src/producer-base.cpp
index e24fb5b..e731168 100644
--- a/src/producer-base.cpp
+++ b/src/producer-base.cpp
@@ -45,6 +45,7 @@
   , m_userPrefix(userPrefix)
   , m_syncReplyFreshness(syncReplyFreshness)
   , m_helloReplyFreshness(helloReplyFreshness)
+  , m_segmentPublisher(m_face, m_keyChain)
 {
   addUserNode(userPrefix);
 }
@@ -129,9 +130,11 @@
   ndn::Name dataName(name);
   m_iblt.appendToName(dataName);
 
+  dataName.appendSegment(0);
   ndn::Data data(dataName);
   data.setFreshnessPeriod(m_syncReplyFreshness);
   data.setContentType(ndn::tlv::ContentType_Nack);
+  data.setFinalBlock(dataName[-1]);
   m_keyChain.sign(data);
   m_face.put(data);
 }
diff --git a/src/producer-base.hpp b/src/producer-base.hpp
index 079b803..4aa0ef7 100644
--- a/src/producer-base.hpp
+++ b/src/producer-base.hpp
@@ -24,6 +24,7 @@
 #include "detail/bloom-filter.hpp"
 #include "detail/util.hpp"
 #include "detail/test-access-control.hpp"
+#include "segment-publisher.hpp"
 
 #include <ndn-cxx/face.hpp>
 #include <ndn-cxx/util/scheduler.hpp>
@@ -180,6 +181,8 @@
   ndn::time::milliseconds m_syncReplyFreshness;
   ndn::time::milliseconds m_helloReplyFreshness;
 
+  SegmentPublisher m_segmentPublisher;
+
   std::mt19937 m_rng;
 };
 
diff --git a/src/segment-publisher.cpp b/src/segment-publisher.cpp
new file mode 100644
index 0000000..d78d47f
--- /dev/null
+++ b/src/segment-publisher.cpp
@@ -0,0 +1,103 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "segment-publisher.hpp"
+
+#include <ndn-cxx/name-component.hpp>
+
+namespace psync {
+
+SegmentPublisher::SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
+                                   size_t imsLimit)
+  : m_face(face)
+  , m_scheduler(m_face.getIoService())
+  , m_keyChain(keyChain)
+  , m_ims(imsLimit)
+{
+}
+
+void
+SegmentPublisher::publish(const ndn::Name& interestName, const ndn::Name& dataName,
+                          const ndn::Block& block, ndn::time::milliseconds freshness)
+{
+  uint64_t interestSegment = 0;
+  if (interestName[-1].isSegment()) {
+    interestSegment = interestName[-1].toSegment();
+  }
+
+  ndn::EncodingBuffer buffer;
+  buffer.prependBlock(std::move(block));
+
+  const uint8_t* rawBuffer = buffer.buf();
+  const uint8_t* segmentBegin = rawBuffer;
+  const uint8_t* end = rawBuffer + buffer.size();
+
+  size_t maxPacketSize = (ndn::MAX_NDN_PACKET_SIZE >> 1);
+
+  uint64_t totalSegments = buffer.size() / maxPacketSize;
+
+  uint64_t segmentNo = 0;
+  do {
+    const uint8_t* segmentEnd = segmentBegin + maxPacketSize;
+    if (segmentEnd > end) {
+      segmentEnd = end;
+    }
+
+    ndn::Name segmentName(dataName);
+    segmentName.appendSegment(segmentNo);
+
+    // We get a std::exception: bad_weak_ptr from m_ims if we don't use shared_ptr for data
+    std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+    data->setContent(segmentBegin, segmentEnd - segmentBegin);
+    data->setFreshnessPeriod(freshness);
+    data->setFinalBlock(ndn::name::Component::fromSegment(totalSegments));
+
+    segmentBegin = segmentEnd;
+
+    m_keyChain.sign(*data);
+
+    // Put on face only the segment which has a pending interest
+    // otherwise the segment is unsolicited
+    if (interestSegment == segmentNo) {
+      m_face.put(*data);
+    }
+
+    m_ims.insert(*data, freshness);
+    m_scheduler.scheduleEvent(freshness,
+                                [this, segmentName] {
+                                m_ims.erase(segmentName);
+                              });
+
+    ++segmentNo;
+  } while (segmentBegin < end);
+}
+
+bool
+SegmentPublisher::replyFromStore(const ndn::Name& interestName)
+{
+  auto it = m_ims.find(interestName);
+
+  if (it != nullptr) {
+    m_face.put(*it);
+    return true;
+  }
+  return false;
+}
+
+} // namespace psync
diff --git a/src/segment-publisher.hpp b/src/segment-publisher.hpp
new file mode 100644
index 0000000..b23ac52
--- /dev/null
+++ b/src/segment-publisher.hpp
@@ -0,0 +1,78 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  The University of Memphis
+ *
+ * This file is part of PSync.
+ * See AUTHORS.md for complete list of PSync authors and contributors.
+ *
+ * PSync is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * PSync, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef PSYNC_SEGMENT_PUBLISHER_HPP
+#define PSYNC_SEGMENT_PUBLISHER_HPP
+
+#include "detail/test-access-control.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/ims/in-memory-storage-fifo.hpp>
+
+namespace psync {
+
+const int MAX_SEGMENTS_STORED = 100;
+
+/**
+ * @brief Segment Publisher to publish segmented data
+ *
+ */
+class SegmentPublisher
+{
+public:
+  SegmentPublisher(ndn::Face& face, ndn::KeyChain& keyChain,
+                   size_t imsLimit = MAX_SEGMENTS_STORED);
+
+  /**
+   * @brief Put all the segments in memory.
+   *
+   * @param interestName the interest name, to determine the sequence to be answered immediately
+   * @param dataName the data name, has components after interest name
+   * @param block the content of the data
+   * @param freshness freshness of the segments
+   */
+  void
+  publish(const ndn::Name& interestName, const ndn::Name& dataName,
+          const ndn::Block& block, ndn::time::milliseconds freshness);
+
+  /**
+   * @brief Try to reply from memory, return false if we cannot find the segment.
+   *
+   * The caller is then expected to use publish if this returns false.
+   */
+  bool
+  replyFromStore(const ndn::Name& interestName);
+
+private:
+  ndn::Face& m_face;
+  ndn::util::Scheduler m_scheduler;
+  ndn::KeyChain& m_keyChain;
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+  ndn::InMemoryStorageFifo m_ims;
+};
+
+} // namespace psync
+
+#endif // PSYNC_SEGMENT_PUBLISHER_HPP
\ No newline at end of file