core: use markers in StatusDataset and NotificationStream

This commit also refactors FaceMonitor as generic NotificationSubscriber,
and refactors AutoregServer to use FaceMonitor.

refs #1837 #1838

Change-Id: I8b40dfae118853d1224c8290cf92e7cc0daa116f
diff --git a/core/event-emitter.hpp b/core/event-emitter.hpp
index 35925f9..e63e111 100644
--- a/core/event-emitter.hpp
+++ b/core/event-emitter.hpp
@@ -1,11 +1,12 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology
+ * Copyright (c) 2014,  Regents of the University of California,
+ *                      Arizona Board of Regents,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University,
+ *                      Washington University in St. Louis,
+ *                      Beijing Institute of Technology,
+ *                      The University of Memphis
  *
  * This file is part of NFD (Named Data Networking Forwarding Daemon).
  * See AUTHORS.md for complete list of NFD authors and contributors.
@@ -20,7 +21,7 @@
  *
  * You should have received a copy of the GNU General Public License along with
  * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
 
 #ifndef NFD_CORE_EVENT_EMITTER_HPP
 #define NFD_CORE_EVENT_EMITTER_HPP
@@ -29,25 +30,27 @@
 
 namespace nfd {
 
-struct empty {};
+struct empty
+{
+};
 
 /** \class EventEmitter
  *  \brief provides a lightweight event system
  *
  *  To declare an event:
- *    EventEmitter<TArgs> m_eventName;
+ *    EventEmitter<TArgs> onEventName;
  *  To subscribe to an event:
- *    eventSource->m_eventName += eventHandler;
+ *    eventSource->onEventName += eventHandler;
  *    Multiple functions can subscribe to the same event.
  *  To trigger an event:
- *    m_eventName(args);
+ *    onEventName(args);
  *  To clear event subscriptions:
- *    m_eventName.clear();
+ *    onEventName.clear();
  */
 
 // four arguments
 template<typename T1 = empty, typename T2 = empty,
-    typename T3 = empty, typename T4 = empty>
+         typename T3 = empty, typename T4 = empty>
 class EventEmitter : noncopyable
 {
 public:
@@ -199,6 +202,8 @@
   std::vector<Handler>::iterator it;
   for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
     (*it)();
+    if (m_handlers.empty()) // .clear has been called
+      return;
   }
 }
 
@@ -232,6 +237,8 @@
   typename std::vector<Handler>::iterator it;
   for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
     (*it)(a1);
+    if (m_handlers.empty()) // .clear has been called
+      return;
   }
 }
 
@@ -266,6 +273,8 @@
   typename std::vector<Handler>::iterator it;
   for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
     (*it)(a1, a2);
+    if (m_handlers.empty()) // .clear has been called
+      return;
   }
 }
 
@@ -300,6 +309,8 @@
   typename std::vector<Handler>::iterator it;
   for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
     (*it)(a1, a2, a3);
+    if (m_handlers.empty()) // .clear has been called
+      return;
   }
 }
 
@@ -334,6 +345,8 @@
   typename std::vector<Handler>::iterator it;
   for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
     (*it)(a1, a2, a3, a4);
+    if (m_handlers.empty()) // .clear has been called
+      return;
   }
 }
 
diff --git a/core/face-monitor.cpp b/core/face-monitor.cpp
deleted file mode 100644
index 914e3df..0000000
--- a/core/face-monitor.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology,
- *                     The University of Memphis
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE.  See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
- **/
-
-#include "face-monitor.hpp"
-#include "core/logger.hpp"
-
-#include <ndn-cxx/face.hpp>
-
-namespace nfd {
-
-NFD_LOG_INIT("FaceMonitor");
-
-FaceMonitor::FaceMonitor(ndn::Face& face)
-  : m_face(face)
-  , m_isStopped(true)
-{
-}
-
-FaceMonitor::~FaceMonitor()
-{
-}
-
-void
-FaceMonitor::removeAllSubscribers()
-{
-  m_notificationCallbacks.clear();
-  m_timeoutCallbacks.clear();
-  stopNotifications();
-}
-
-void
-FaceMonitor::stopNotifications()
-{
-  if (static_cast<bool>(m_lastInterestId))
-    m_face.removePendingInterest(m_lastInterestId);
-  m_isStopped = true;
-}
-
-void
-FaceMonitor::startNotifications()
-{
-  if (m_isStopped == false)
-    return; // Notifications cycle has been started.
-  m_isStopped = false;
-
-  Interest interest("/localhost/nfd/faces/events");
-  interest
-    .setMustBeFresh(true)
-    .setChildSelector(1)
-    .setInterestLifetime(time::seconds(60))
-  ;
-
-  NFD_LOG_DEBUG("startNotification: Interest Sent: " << interest);
-
-  m_lastInterestId = m_face.expressInterest(interest,
-                                            bind(&FaceMonitor::onNotification, this, _2),
-                                            bind(&FaceMonitor::onTimeout, this));
-}
-
-void
-FaceMonitor::addSubscriber(const NotificationCallback& notificationCallback)
-{
-  addSubscriber(notificationCallback, TimeoutCallback());
-}
-
-void
-FaceMonitor::addSubscriber(const NotificationCallback& notificationCallback,
-                           const TimeoutCallback&  timeoutCallback)
-{
-  if (static_cast<bool>(notificationCallback))
-    m_notificationCallbacks.push_back(notificationCallback);
-
-  if (static_cast<bool>(timeoutCallback))
-    m_timeoutCallbacks.push_back(timeoutCallback);
-}
-
-void
-FaceMonitor::onTimeout()
-{
-  if (m_isStopped)
-    return;
-
-  std::vector<TimeoutCallback>::iterator it;
-  for (it = m_timeoutCallbacks.begin();
-       it != m_timeoutCallbacks.end(); ++it) {
-    (*it)();
-    //One of the registered callbacks has cleared the vector,
-    //return now as the iterator has been invalidated and
-    //the vector is empty.
-    if (m_timeoutCallbacks.empty()) {
-       return;
-    }
-  }
-
-  Interest newInterest("/localhost/nfd/faces/events");
-  newInterest
-    .setMustBeFresh(true)
-    .setChildSelector(1)
-    .setInterestLifetime(time::seconds(60))
-    ;
-
-  NFD_LOG_DEBUG("In onTimeout, sending interest: " << newInterest);
-
-  m_lastInterestId = m_face.expressInterest(newInterest,
-                                            bind(&FaceMonitor::onNotification, this, _2),
-                                            bind(&FaceMonitor::onTimeout, this));
-}
-
-void
-FaceMonitor::onNotification(const Data& data)
-{
-  if (m_isStopped)
-    return;
-
-  m_lastSequence = data.getName().get(-1).toSegment();
-  ndn::nfd::FaceEventNotification notification(data.getContent().blockFromValue());
-
-  std::vector<NotificationCallback>::iterator it;
-  for (it = m_notificationCallbacks.begin();
-       it != m_notificationCallbacks.end(); ++it) {
-    (*it)(notification);
-    if (m_notificationCallbacks.empty()) {
-      //One of the registered callbacks has cleared the vector.
-      //return back, as no one is interested in notifications anymore.
-      return;
-    }
-  }
-
-  //Setting up next notification
-  Name nextNotification("/localhost/nfd/faces/events");
-  nextNotification.appendSegment(m_lastSequence + 1);
-
-  Interest interest(nextNotification);
-  interest.setInterestLifetime(time::seconds(60));
-
-  NFD_LOG_DEBUG("onNotification: Interest sent: " <<  interest);
-
-  m_lastInterestId = m_face.expressInterest(interest,
-                                            bind(&FaceMonitor::onNotification, this, _2),
-                                            bind(&FaceMonitor::onTimeout, this));
-}
-
-} // namespace nfd
diff --git a/core/face-monitor.hpp b/core/face-monitor.hpp
index 459b181..521fc23 100644
--- a/core/face-monitor.hpp
+++ b/core/face-monitor.hpp
@@ -1,12 +1,12 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology,
- *                     The University of Memphis
+ * Copyright (c) 2014,  Regents of the University of California,
+ *                      Arizona Board of Regents,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University,
+ *                      Washington University in St. Louis,
+ *                      Beijing Institute of Technology,
+ *                      The University of Memphis
  *
  * This file is part of NFD (Named Data Networking Forwarding Daemon).
  * See AUTHORS.md for complete list of NFD authors and contributors.
@@ -21,83 +21,26 @@
  *
  * You should have received a copy of the GNU General Public License along with
  * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
 
 #ifndef NFD_CORE_FACE_MONITOR_HPP
 #define NFD_CORE_FACE_MONITOR_HPP
 
-#include "common.hpp"
+#include "notification-subscriber.hpp"
 
 #include <ndn-cxx/management/nfd-face-event-notification.hpp>
 
-namespace ndn {
-class Face;
-class PendingInterestId;
-}
-
 namespace nfd {
 
 using ndn::nfd::FaceEventNotification;
 
-/**
- * \brief Helper class to subscribe to face notification events
- *
- * \todo Write test cases
- */
-class FaceMonitor : noncopyable
+class FaceMonitor : public NotificationSubscriber<FaceEventNotification>
 {
 public:
-  typedef function<void(const ndn::nfd::FaceEventNotification&)> NotificationCallback;
-  typedef function<void()> TimeoutCallback;
-
-  typedef std::vector<NotificationCallback> NotificationCallbacks;
-  typedef std::vector<TimeoutCallback> TimeoutCallbacks;
-
-  explicit
-  FaceMonitor(ndn::Face& face);
-
-  ~FaceMonitor();
-
-  /** \brief Stops all notifications. This method  doesn't remove registered callbacks.
-   */
-  void
-  stopNotifications();
-
-  /** \brief Resumes notifications for added subscribers.
-   */
-  void
-  startNotifications();
-
-  /** \brief Removes all notification subscribers.
-   */
-  void
-  removeAllSubscribers();
-
-  /** \brief Adds a notification subscriber. This method doesn't return on timeouts.
-   */
-  void
-  addSubscriber(const NotificationCallback& notificationCallback);
-
-  /** \brief Adds a notification subscriber.
-   */
-  void
-  addSubscriber(const NotificationCallback& notificationCallback,
-                const TimeoutCallback&  timeoutCallback);
-
-private:
-  void
-  onTimeout();
-
-  void
-  onNotification(const Data& data);
-
-private:
-  ndn::Face& m_face;
-  uint64_t m_lastSequence;
-  bool m_isStopped;
-  NotificationCallbacks m_notificationCallbacks;
-  TimeoutCallbacks m_timeoutCallbacks;
-  const ndn::PendingInterestId* m_lastInterestId;
+  FaceMonitor(ndn::Face& face)
+    : NotificationSubscriber<FaceEventNotification>(face, "ndn:/localhost/nfd/faces/events")
+  {
+  }
 };
 
 } // namespace nfd
diff --git a/core/notification-stream.hpp b/core/notification-stream.hpp
new file mode 100644
index 0000000..9a89b4c
--- /dev/null
+++ b/core/notification-stream.hpp
@@ -0,0 +1,81 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014,  Regents of the University of California,
+ *                      Arizona Board of Regents,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University,
+ *                      Washington University in St. Louis,
+ *                      Beijing Institute of Technology,
+ *                      The University of Memphis
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_CORE_NOTIFICATION_STREAM_HPP
+#define NFD_CORE_NOTIFICATION_STREAM_HPP
+
+#include "common.hpp"
+
+#include <ndn-cxx/encoding/encoding-buffer.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+namespace nfd {
+
+/** \brief provides a publisher of Notification Stream
+ *  \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
+ */
+template <class FaceBase>
+class NotificationStream : noncopyable
+{
+public:
+  NotificationStream(FaceBase& face, const Name& prefix, ndn::KeyChain& keyChain)
+    : m_face(face)
+    , m_prefix(prefix)
+    , m_keyChain(keyChain)
+    , m_sequenceNo(0)
+  {
+  }
+
+  virtual
+  ~NotificationStream()
+  {
+  }
+
+  template<typename T> void
+  postNotification(const T& notification)
+  {
+    Name dataName = m_prefix;
+    dataName.appendSequenceNumber(m_sequenceNo);
+
+    shared_ptr<Data> data = make_shared<Data>(dataName);
+    data->setContent(notification.wireEncode());
+    data->setFreshnessPeriod(time::seconds(1));
+
+    m_keyChain.sign(*data);
+    m_face.put(*data);
+
+    ++m_sequenceNo;
+  }
+
+private:
+  FaceBase& m_face;
+  const Name m_prefix;
+  ndn::KeyChain& m_keyChain;
+  uint64_t m_sequenceNo;
+};
+
+} // namespace nfd
+
+#endif // NFD_CORE_NOTIFICATION_STREAM_HPP
diff --git a/core/notification-subscriber.hpp b/core/notification-subscriber.hpp
new file mode 100644
index 0000000..c359c82
--- /dev/null
+++ b/core/notification-subscriber.hpp
@@ -0,0 +1,214 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014,  Regents of the University of California,
+ *                      Arizona Board of Regents,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University,
+ *                      Washington University in St. Louis,
+ *                      Beijing Institute of Technology,
+ *                      The University of Memphis
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
+#define NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
+
+#include "event-emitter.hpp"
+
+#include <ndn-cxx/face.hpp>
+
+namespace nfd {
+
+/** \brief provides a subscriber of Notification Stream
+ *  \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
+ *  \tparam T type of Notification item, appears in payload of Data packets
+ */
+template<typename T>
+class NotificationSubscriber : noncopyable
+{
+public:
+  /** \brief construct a NotificationSubscriber
+   *  \note The subscriber is not started after construction.
+   *        User should add one or more handlers to onNotification, and invoke .start().
+   */
+  NotificationSubscriber(ndn::Face& face, const Name& prefix)
+    : m_face(face)
+    , m_prefix(prefix)
+    , m_isRunning(false)
+    , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
+    , m_lastInterestId(0)
+  {
+  }
+
+  virtual
+  ~NotificationSubscriber()
+  {
+  }
+
+  /** \return InterestLifetime of Interests to retrieve notifications
+   *  \details This must be greater than FreshnessPeriod of Notification Data packets,
+   *           to ensure correct operation of this subscriber implementation.
+   */
+  static time::milliseconds
+  getInterestLifetime()
+  {
+    return time::milliseconds(60000);
+  }
+
+  bool
+  isRunning() const
+  {
+    return m_isRunning;
+  }
+
+  /** \brief start or resume receiving notifications
+   *  \note onNotification must have at least one listener,
+   *        otherwise this operation has no effect.
+   */
+  void
+  start()
+  {
+    if (m_isRunning) // already running
+      return;
+    m_isRunning = true;
+
+    this->sendInitialInterest();
+  }
+
+  /** \brief stop receiving notifications
+   */
+  void
+  stop()
+  {
+    if (!m_isRunning) // not running
+      return;
+    m_isRunning = false;
+
+    if (m_lastInterestId != 0)
+      m_face.removePendingInterest(m_lastInterestId);
+    m_lastInterestId = 0;
+  }
+
+public: // subscriptions
+  /** \brief fires when a Notification is received
+   *  \note Removing all handlers will cause the subscriber to stop.
+   */
+  EventEmitter<T> onNotification;
+
+  /** \brief fires when no Notification is received within .getInterestLifetime period
+   */
+  EventEmitter<> onTimeout;
+
+  /** \brief fires when a Data packet in the Notification Stream cannot be decoded as T
+   */
+  EventEmitter<Data> onDecodeError;
+
+private:
+  void
+  sendInitialInterest()
+  {
+    if (this->shouldStop())
+      return;
+
+    shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
+    interest->setMustBeFresh(true);
+    interest->setChildSelector(1);
+    interest->setInterestLifetime(getInterestLifetime());
+
+    m_lastInterestId = m_face.expressInterest(*interest,
+                         bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
+                         bind(&NotificationSubscriber<T>::afterTimeout, this));
+  }
+
+  void
+  sendNextInterest()
+  {
+    if (this->shouldStop())
+      return;
+
+    BOOST_ASSERT(m_lastSequenceNo !=
+                 std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
+
+    Name nextName = m_prefix;
+    nextName.appendSequenceNumber(m_lastSequenceNo + 1);
+
+    shared_ptr<Interest> interest = make_shared<Interest>(nextName);
+    interest->setInterestLifetime(getInterestLifetime());
+
+    m_lastInterestId = m_face.expressInterest(*interest,
+                         bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
+                         bind(&NotificationSubscriber<T>::afterTimeout, this));
+  }
+
+  /** \brief Check if the subscriber is or should be stopped.
+   *  \return true if the subscriber is stopped.
+   */
+  bool
+  shouldStop()
+  {
+    if (!m_isRunning)
+      return true;
+    if (onNotification.isEmpty()) {
+      this->stop();
+      return true;
+    }
+    return false;
+  }
+
+  void
+  afterReceiveData(const Data& data)
+  {
+    if (this->shouldStop())
+      return;
+
+    T notification;
+    try {
+      m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
+      notification.wireDecode(data.getContent().blockFromValue());
+    }
+    catch (tlv::Error&) {
+      this->onDecodeError(data);
+      this->sendInitialInterest();
+      return;
+    }
+
+    this->onNotification(notification);
+
+    this->sendNextInterest();
+  }
+
+  void
+  afterTimeout()
+  {
+    if (this->shouldStop())
+      return;
+
+    this->onTimeout();
+
+    this->sendInitialInterest();
+  }
+
+private:
+  ndn::Face& m_face;
+  Name m_prefix;
+  bool m_isRunning;
+  uint64_t m_lastSequenceNo;
+  const ndn::PendingInterestId* m_lastInterestId;
+};
+
+} // namespace nfd
+
+#endif // NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
diff --git a/core/segment-publisher.hpp b/core/segment-publisher.hpp
index b1aa4a9..ccb3426 100644
--- a/core/segment-publisher.hpp
+++ b/core/segment-publisher.hpp
@@ -33,6 +33,9 @@
 
 namespace nfd {
 
+/** \brief provides a publisher of Status Dataset or other segmented octet stream
+ *  \sa http://redmine.named-data.net/projects/nfd/wiki/StatusDataset
+ */
 template <class FaceBase>
 class SegmentPublisher : noncopyable
 {
@@ -59,48 +62,44 @@
   void
   publish()
   {
-    Name segmentPrefix(m_prefix);
-    segmentPrefix.appendVersion();
-
     ndn::EncodingBuffer buffer;
-
     generate(buffer);
 
     const uint8_t* rawBuffer = buffer.buf();
     const uint8_t* segmentBegin = rawBuffer;
     const uint8_t* end = rawBuffer + buffer.size();
 
+    Name segmentPrefix(m_prefix);
+    segmentPrefix.appendVersion();
+
     uint64_t segmentNo = 0;
-    do
-      {
-        const uint8_t* segmentEnd = segmentBegin + getMaxSegmentSize();
-        if (segmentEnd > end)
-          {
-            segmentEnd = end;
-          }
-
-        Name segmentName(segmentPrefix);
-        segmentName.appendSegment(segmentNo);
-
-        shared_ptr<Data> data(make_shared<Data>(segmentName));
-        data->setContent(segmentBegin, segmentEnd - segmentBegin);
-
-        segmentBegin = segmentEnd;
-        if (segmentBegin >= end)
-          {
-            data->setFinalBlockId(segmentName[-1]);
-          }
-
-        publishSegment(data);
-        segmentNo++;
+    do {
+      const uint8_t* segmentEnd = segmentBegin + getMaxSegmentSize();
+      if (segmentEnd > end) {
+        segmentEnd = end;
       }
-    while (segmentBegin < end);
+
+      Name segmentName(segmentPrefix);
+      segmentName.appendSegment(segmentNo);
+
+      shared_ptr<Data> data = make_shared<Data>(segmentName);
+      data->setContent(segmentBegin, segmentEnd - segmentBegin);
+
+      segmentBegin = segmentEnd;
+      if (segmentBegin >= end) {
+        data->setFinalBlockId(segmentName[-1]);
+      }
+
+      publishSegment(data);
+      ++segmentNo;
+    } while (segmentBegin < end);
   }
 
 protected:
-
+  /** \brief In a derived class, write the octets into outBuffer.
+   */
   virtual size_t
-  generate(ndn::EncodingBuffer& outBuffer) =0;
+  generate(ndn::EncodingBuffer& outBuffer) = 0;
 
 private:
   void