util: move notification type agnostic code to NotificationSubscriberBase

refs #3663

Change-Id: I732186fdd65320555740a3aa51b57d441e9acca4
diff --git a/src/util/notification-subscriber.cpp b/src/util/notification-subscriber.cpp
new file mode 100644
index 0000000..09a1277
--- /dev/null
+++ b/src/util/notification-subscriber.cpp
@@ -0,0 +1,214 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2013-2016 Regents of the University of California.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ */
+
+/**
+ * Original copyright notice from NFD:
+ *
+ * Copyright (c) 2014,  Regents of the University of California,
+ *                      Arizona Board of Regents,
+ *                      Colorado State University,
+ *                      University Pierre & Marie Curie, Sorbonne University,
+ *                      Washington University in St. Louis,
+ *                      Beijing Institute of Technology,
+ *                      The University of Memphis
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "notification-subscriber.hpp"
+#include "random.hpp"
+
+namespace ndn {
+namespace util {
+
+NotificationSubscriberBase::NotificationSubscriberBase(Face& face, const Name& prefix,
+                                                       time::milliseconds interestLifetime)
+  : m_face(face)
+  , m_prefix(prefix)
+  , m_isRunning(false)
+  , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
+  , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
+  , m_attempts(1)
+  , m_scheduler(face.getIoService())
+  , m_nackEvent(m_scheduler)
+  , m_interestLifetime(interestLifetime)
+{
+}
+
+NotificationSubscriberBase::~NotificationSubscriberBase() = default;
+
+void
+NotificationSubscriberBase::start()
+{
+  if (m_isRunning) // already running
+    return;
+  m_isRunning = true;
+
+  this->sendInitialInterest();
+}
+
+void
+NotificationSubscriberBase::stop()
+{
+  if (!m_isRunning) // not running
+    return;
+  m_isRunning = false;
+
+  if (m_lastInterestId != 0)
+    m_face.removePendingInterest(m_lastInterestId);
+  m_lastInterestId = 0;
+}
+
+void
+NotificationSubscriberBase::sendInitialInterest()
+{
+  if (this->shouldStop())
+    return;
+
+  auto interest = make_shared<Interest>(m_prefix);
+  interest->setMustBeFresh(true);
+  interest->setChildSelector(1);
+  interest->setInterestLifetime(getInterestLifetime());
+
+  m_lastInterestId = m_face.expressInterest(*interest,
+                       bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
+                       bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
+                       bind(&NotificationSubscriberBase::afterTimeout, this));
+}
+
+void
+NotificationSubscriberBase::sendNextInterest()
+{
+  if (this->shouldStop())
+    return;
+
+  BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max()); // overflow or missing initial reply
+
+  Name nextName = m_prefix;
+  nextName.appendSequenceNumber(m_lastSequenceNo + 1);
+
+  auto interest = make_shared<Interest>(nextName);
+  interest->setInterestLifetime(getInterestLifetime());
+
+  m_lastInterestId = m_face.expressInterest(*interest,
+                       bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
+                       bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
+                       bind(&NotificationSubscriberBase::afterTimeout, this));
+}
+
+bool
+NotificationSubscriberBase::shouldStop()
+{
+  if (!m_isRunning)
+    return true;
+  if (!this->hasSubscriber() && onNack.isEmpty()) {
+    this->stop();
+    return true;
+  }
+  return false;
+}
+
+void
+NotificationSubscriberBase::afterReceiveData(const Data& data)
+{
+  if (this->shouldStop())
+    return;
+
+  try {
+    m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
+  }
+  catch (const tlv::Error&) {
+    this->onDecodeError(data);
+    this->sendInitialInterest();
+    return;
+  }
+
+  if (!this->decodeAndDeliver(data)) {
+    this->onDecodeError(data);
+    this->sendInitialInterest();
+    return;
+  }
+
+  this->sendNextInterest();
+}
+
+void
+NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
+{
+  if (this->shouldStop())
+    return;
+
+  this->onNack(nack);
+
+  time::milliseconds delay = exponentialBackoff(nack);
+  m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
+}
+
+void
+NotificationSubscriberBase::afterTimeout()
+{
+  if (this->shouldStop())
+    return;
+
+  this->onTimeout();
+
+  this->sendInitialInterest();
+}
+
+time::milliseconds
+NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
+{
+  uint64_t nackSequenceNo;
+
+  try {
+    nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
+  }
+  catch (const tlv::Error&) {
+    nackSequenceNo = 0;
+  }
+
+  if (m_lastNackSequenceNo == nackSequenceNo) {
+    ++m_attempts;
+  }
+  else {
+    m_attempts = 1;
+  }
+
+  m_lastNackSequenceNo = nackSequenceNo;
+
+  return time::milliseconds(static_cast<uint32_t>(pow(2, m_attempts) * 100 +
+                                                  random::generateWord32() % 100));
+}
+
+} // namespace util
+} // namespace ndn
diff --git a/src/util/notification-subscriber.hpp b/src/util/notification-subscriber.hpp
index a102cfa..bc7345c 100644
--- a/src/util/notification-subscriber.hpp
+++ b/src/util/notification-subscriber.hpp
@@ -52,7 +52,6 @@
 #include "signal.hpp"
 #include "concepts.hpp"
 #include "time.hpp"
-#include "random.hpp"
 #include "scheduler.hpp"
 #include "scheduler-scoped-event-id.hpp"
 #include <boost/concept_check.hpp>
@@ -60,43 +59,16 @@
 namespace ndn {
 namespace util {
 
-/** \brief provides a subscriber of Notification Stream
- *  \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
- *  \tparam Notification type of Notification item, appears in payload of Data packets
- */
-template<typename Notification>
-class NotificationSubscriber : noncopyable
+class NotificationSubscriberBase : noncopyable
 {
 public:
-  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Notification>));
-  BOOST_CONCEPT_ASSERT((WireDecodable<Notification>));
-
-  /** \brief construct a NotificationSubscriber
-   *  \note The subscriber is not started after construction.
-   *        User should add one or more handlers to onNotification, and invoke .start().
-   */
-  NotificationSubscriber(Face& face, const Name& prefix,
-                         const time::milliseconds& interestLifetime = time::milliseconds(60000))
-    : m_face(face)
-    , m_prefix(prefix)
-    , m_isRunning(false)
-    , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
-    , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
-    , m_attempts(1)
-    , m_scheduler(face.getIoService())
-    , m_nackEvent(m_scheduler)
-    , m_interestLifetime(interestLifetime)
-  {
-  }
-
   virtual
-  ~NotificationSubscriber()
-  {
-  }
+  ~NotificationSubscriberBase();
 
   /** \return InterestLifetime of Interests to retrieve notifications
-   *  \details This must be greater than FreshnessPeriod of Notification Data packets,
-   *           to ensure correct operation of this subscriber implementation.
+   *
+   *  This must be greater than FreshnessPeriod of Notification Data packets,
+   *  to ensure correct operation of this subscriber implementation.
    */
   time::milliseconds
   getInterestLifetime() const
@@ -115,170 +87,67 @@
    *        otherwise this operation has no effect.
    */
   void
-  start()
-  {
-    if (m_isRunning) // already running
-      return;
-    m_isRunning = true;
-
-    this->sendInitialInterest();
-  }
+  start();
 
   /** \brief stop receiving notifications
    */
   void
-  stop()
-  {
-    if (!m_isRunning) // not running
-      return;
-    m_isRunning = false;
+  stop();
 
-    if (m_lastInterestId != 0)
-      m_face.removePendingInterest(m_lastInterestId);
-    m_lastInterestId = 0;
-  }
-
-public: // subscriptions
-  /** \brief fires when a Notification is received
-   *  \note Removing all handlers will cause the subscriber to stop.
+protected:
+  /** \brief construct a NotificationSubscriber
+   *  \note The subscriber is not started after construction.
+   *        User should add one or more handlers to onNotification, and invoke .start().
    */
-  signal::Signal<NotificationSubscriber, Notification> onNotification;
-
-  /** \brief fires when a NACK is received
-   */
-  signal::Signal<NotificationSubscriber, lp::Nack> onNack;
-
-  /** \brief fires when no Notification is received within .getInterestLifetime period
-   */
-  signal::Signal<NotificationSubscriber> onTimeout;
-
-  /** \brief fires when a Data packet in the Notification Stream cannot be decoded as Notification
-   */
-  signal::Signal<NotificationSubscriber, Data> onDecodeError;
+  NotificationSubscriberBase(Face& face, const Name& prefix,
+                             time::milliseconds interestLifetime);
 
 private:
   void
-  sendInitialInterest()
-  {
-    if (this->shouldStop())
-      return;
-
-    shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
-    interest->setMustBeFresh(true);
-    interest->setChildSelector(1);
-    interest->setInterestLifetime(getInterestLifetime());
-
-    m_lastInterestId = m_face.expressInterest(*interest,
-                         bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
-                         bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
-                         bind(&NotificationSubscriber<Notification>::afterTimeout, this));
-  }
+  sendInitialInterest();
 
   void
-  sendNextInterest()
-  {
-    if (this->shouldStop())
-      return;
+  sendNextInterest();
 
-    BOOST_ASSERT(m_lastSequenceNo !=
-                 std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
-
-    Name nextName = m_prefix;
-    nextName.appendSequenceNumber(m_lastSequenceNo + 1);
-
-    shared_ptr<Interest> interest = make_shared<Interest>(nextName);
-    interest->setInterestLifetime(getInterestLifetime());
-
-    m_lastInterestId = m_face.expressInterest(*interest,
-                         bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
-                         bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
-                         bind(&NotificationSubscriber<Notification>::afterTimeout, this));
-  }
+  virtual bool
+  hasSubscriber() const = 0;
 
   /** \brief Check if the subscriber is or should be stopped.
    *  \return true if the subscriber is stopped.
    */
   bool
-  shouldStop()
-  {
-    if (!m_isRunning)
-      return true;
-    if (onNotification.isEmpty() && onNack.isEmpty()) {
-      this->stop();
-      return true;
-    }
-    return false;
-  }
+  shouldStop();
 
   void
-  afterReceiveData(const Data& data)
-  {
-    if (this->shouldStop())
-      return;
+  afterReceiveData(const Data& data);
 
-    Notification notification;
-    try {
-      m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
-      notification.wireDecode(data.getContent().blockFromValue());
-    }
-    catch (tlv::Error&) {
-      this->onDecodeError(data);
-      this->sendInitialInterest();
-      return;
-    }
-
-    this->onNotification(notification);
-
-    this->sendNextInterest();
-  }
+  /** \brief decode the Data as a notification, and deliver it to subscribers
+   *  \return whether decode was successful
+   */
+  virtual bool
+  decodeAndDeliver(const Data& data) = 0;
 
   void
-  afterReceiveNack(const lp::Nack& nack)
-  {
-    if (this->shouldStop())
-      return;
-
-    this->onNack(nack);
-
-    time::milliseconds delay = exponentialBackoff(nack);
-    m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
-  }
+  afterReceiveNack(const lp::Nack& nack);
 
   void
-  afterTimeout()
-  {
-    if (this->shouldStop())
-      return;
-
-    this->onTimeout();
-
-    this->sendInitialInterest();
-  }
+  afterTimeout();
 
   time::milliseconds
-  exponentialBackoff(lp::Nack nack)
-  {
-    uint64_t nackSequenceNo;
+  exponentialBackoff(lp::Nack nack);
 
-    try {
-      nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
-    }
-    catch (name::Component::Error&) {
-      nackSequenceNo = 0;
-    }
+public:
+  /** \brief fires when a NACK is received
+   */
+  signal::Signal<NotificationSubscriberBase, lp::Nack> onNack;
 
-    if (m_lastNackSequenceNo ==  nackSequenceNo) {
-      ++m_attempts;
-    } else {
-      m_attempts = 1;
-    }
+  /** \brief fires when no Notification is received within .getInterestLifetime period
+   */
+  signal::Signal<NotificationSubscriberBase> onTimeout;
 
-    time::milliseconds delayTime =
-      time::milliseconds (static_cast<uint32_t>( pow(2, m_attempts) * 100 + random::generateWord32() % 100));
-
-    m_lastNackSequenceNo = nackSequenceNo;
-    return delayTime;
-  }
+  /** \brief fires when a Data packet in the Notification Stream cannot be decoded as Notification
+   */
+  signal::Signal<NotificationSubscriberBase, Data> onDecodeError;
 
 private:
   Face& m_face;
@@ -293,6 +162,56 @@
   time::milliseconds m_interestLifetime;
 };
 
+/** \brief provides a subscriber of Notification Stream
+ *  \sa https://redmine.named-data.net/projects/nfd/wiki/Notification
+ *  \tparam Notification type of Notification item, appears in payload of Data packets
+ */
+template<typename Notification>
+class NotificationSubscriber : public NotificationSubscriberBase
+{
+public:
+  BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Notification>));
+  BOOST_CONCEPT_ASSERT((WireDecodable<Notification>));
+
+  /** \brief construct a NotificationSubscriber
+   *  \note The subscriber is not started after construction.
+   *        User should add one or more handlers to onNotification, and invoke .start().
+   */
+  NotificationSubscriber(Face& face, const Name& prefix,
+                         time::milliseconds interestLifetime = time::seconds(60))
+    : NotificationSubscriberBase(face, prefix, interestLifetime)
+  {
+  }
+
+public:
+  /** \brief fires when a Notification is received
+   *  \note Removing all handlers will cause the subscriber to stop.
+   */
+  signal::Signal<NotificationSubscriber, Notification> onNotification;
+
+private:
+  virtual bool
+  hasSubscriber() const override
+  {
+    return !onNotification.isEmpty();
+  }
+
+  virtual bool
+  decodeAndDeliver(const Data& data) override
+  {
+    Notification notification;
+    try {
+      notification.wireDecode(data.getContent().blockFromValue());
+    }
+    catch (const tlv::Error&) {
+      return false;
+    }
+
+    onNotification(notification);
+    return true;
+  }
+};
+
 } // namespace util
 } // namespace ndn