core: use markers in StatusDataset and NotificationStream
This commit also refactors FaceMonitor as generic NotificationSubscriber,
and refactors AutoregServer to use FaceMonitor.
refs #1837 #1838
Change-Id: I8b40dfae118853d1224c8290cf92e7cc0daa116f
diff --git a/core/event-emitter.hpp b/core/event-emitter.hpp
index 35925f9..e63e111 100644
--- a/core/event-emitter.hpp
+++ b/core/event-emitter.hpp
@@ -1,11 +1,12 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014 Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology
+ * Copyright (c) 2014, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis
*
* This file is part of NFD (Named Data Networking Forwarding Daemon).
* See AUTHORS.md for complete list of NFD authors and contributors.
@@ -20,7 +21,7 @@
*
* You should have received a copy of the GNU General Public License along with
* NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef NFD_CORE_EVENT_EMITTER_HPP
#define NFD_CORE_EVENT_EMITTER_HPP
@@ -29,25 +30,27 @@
namespace nfd {
-struct empty {};
+struct empty
+{
+};
/** \class EventEmitter
* \brief provides a lightweight event system
*
* To declare an event:
- * EventEmitter<TArgs> m_eventName;
+ * EventEmitter<TArgs> onEventName;
* To subscribe to an event:
- * eventSource->m_eventName += eventHandler;
+ * eventSource->onEventName += eventHandler;
* Multiple functions can subscribe to the same event.
* To trigger an event:
- * m_eventName(args);
+ * onEventName(args);
* To clear event subscriptions:
- * m_eventName.clear();
+ * onEventName.clear();
*/
// four arguments
template<typename T1 = empty, typename T2 = empty,
- typename T3 = empty, typename T4 = empty>
+ typename T3 = empty, typename T4 = empty>
class EventEmitter : noncopyable
{
public:
@@ -199,6 +202,8 @@
std::vector<Handler>::iterator it;
for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
(*it)();
+ if (m_handlers.empty()) // .clear has been called
+ return;
}
}
@@ -232,6 +237,8 @@
typename std::vector<Handler>::iterator it;
for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
(*it)(a1);
+ if (m_handlers.empty()) // .clear has been called
+ return;
}
}
@@ -266,6 +273,8 @@
typename std::vector<Handler>::iterator it;
for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
(*it)(a1, a2);
+ if (m_handlers.empty()) // .clear has been called
+ return;
}
}
@@ -300,6 +309,8 @@
typename std::vector<Handler>::iterator it;
for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
(*it)(a1, a2, a3);
+ if (m_handlers.empty()) // .clear has been called
+ return;
}
}
@@ -334,6 +345,8 @@
typename std::vector<Handler>::iterator it;
for (it = m_handlers.begin(); it != m_handlers.end(); ++it) {
(*it)(a1, a2, a3, a4);
+ if (m_handlers.empty()) // .clear has been called
+ return;
}
}
diff --git a/core/face-monitor.cpp b/core/face-monitor.cpp
deleted file mode 100644
index 914e3df..0000000
--- a/core/face-monitor.cpp
+++ /dev/null
@@ -1,165 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014 Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology,
- * The University of Memphis
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
-
-#include "face-monitor.hpp"
-#include "core/logger.hpp"
-
-#include <ndn-cxx/face.hpp>
-
-namespace nfd {
-
-NFD_LOG_INIT("FaceMonitor");
-
-FaceMonitor::FaceMonitor(ndn::Face& face)
- : m_face(face)
- , m_isStopped(true)
-{
-}
-
-FaceMonitor::~FaceMonitor()
-{
-}
-
-void
-FaceMonitor::removeAllSubscribers()
-{
- m_notificationCallbacks.clear();
- m_timeoutCallbacks.clear();
- stopNotifications();
-}
-
-void
-FaceMonitor::stopNotifications()
-{
- if (static_cast<bool>(m_lastInterestId))
- m_face.removePendingInterest(m_lastInterestId);
- m_isStopped = true;
-}
-
-void
-FaceMonitor::startNotifications()
-{
- if (m_isStopped == false)
- return; // Notifications cycle has been started.
- m_isStopped = false;
-
- Interest interest("/localhost/nfd/faces/events");
- interest
- .setMustBeFresh(true)
- .setChildSelector(1)
- .setInterestLifetime(time::seconds(60))
- ;
-
- NFD_LOG_DEBUG("startNotification: Interest Sent: " << interest);
-
- m_lastInterestId = m_face.expressInterest(interest,
- bind(&FaceMonitor::onNotification, this, _2),
- bind(&FaceMonitor::onTimeout, this));
-}
-
-void
-FaceMonitor::addSubscriber(const NotificationCallback& notificationCallback)
-{
- addSubscriber(notificationCallback, TimeoutCallback());
-}
-
-void
-FaceMonitor::addSubscriber(const NotificationCallback& notificationCallback,
- const TimeoutCallback& timeoutCallback)
-{
- if (static_cast<bool>(notificationCallback))
- m_notificationCallbacks.push_back(notificationCallback);
-
- if (static_cast<bool>(timeoutCallback))
- m_timeoutCallbacks.push_back(timeoutCallback);
-}
-
-void
-FaceMonitor::onTimeout()
-{
- if (m_isStopped)
- return;
-
- std::vector<TimeoutCallback>::iterator it;
- for (it = m_timeoutCallbacks.begin();
- it != m_timeoutCallbacks.end(); ++it) {
- (*it)();
- //One of the registered callbacks has cleared the vector,
- //return now as the iterator has been invalidated and
- //the vector is empty.
- if (m_timeoutCallbacks.empty()) {
- return;
- }
- }
-
- Interest newInterest("/localhost/nfd/faces/events");
- newInterest
- .setMustBeFresh(true)
- .setChildSelector(1)
- .setInterestLifetime(time::seconds(60))
- ;
-
- NFD_LOG_DEBUG("In onTimeout, sending interest: " << newInterest);
-
- m_lastInterestId = m_face.expressInterest(newInterest,
- bind(&FaceMonitor::onNotification, this, _2),
- bind(&FaceMonitor::onTimeout, this));
-}
-
-void
-FaceMonitor::onNotification(const Data& data)
-{
- if (m_isStopped)
- return;
-
- m_lastSequence = data.getName().get(-1).toSegment();
- ndn::nfd::FaceEventNotification notification(data.getContent().blockFromValue());
-
- std::vector<NotificationCallback>::iterator it;
- for (it = m_notificationCallbacks.begin();
- it != m_notificationCallbacks.end(); ++it) {
- (*it)(notification);
- if (m_notificationCallbacks.empty()) {
- //One of the registered callbacks has cleared the vector.
- //return back, as no one is interested in notifications anymore.
- return;
- }
- }
-
- //Setting up next notification
- Name nextNotification("/localhost/nfd/faces/events");
- nextNotification.appendSegment(m_lastSequence + 1);
-
- Interest interest(nextNotification);
- interest.setInterestLifetime(time::seconds(60));
-
- NFD_LOG_DEBUG("onNotification: Interest sent: " << interest);
-
- m_lastInterestId = m_face.expressInterest(interest,
- bind(&FaceMonitor::onNotification, this, _2),
- bind(&FaceMonitor::onTimeout, this));
-}
-
-} // namespace nfd
diff --git a/core/face-monitor.hpp b/core/face-monitor.hpp
index 459b181..521fc23 100644
--- a/core/face-monitor.hpp
+++ b/core/face-monitor.hpp
@@ -1,12 +1,12 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014 Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology,
- * The University of Memphis
+ * Copyright (c) 2014, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis
*
* This file is part of NFD (Named Data Networking Forwarding Daemon).
* See AUTHORS.md for complete list of NFD authors and contributors.
@@ -21,83 +21,26 @@
*
* You should have received a copy of the GNU General Public License along with
* NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
+ */
#ifndef NFD_CORE_FACE_MONITOR_HPP
#define NFD_CORE_FACE_MONITOR_HPP
-#include "common.hpp"
+#include "notification-subscriber.hpp"
#include <ndn-cxx/management/nfd-face-event-notification.hpp>
-namespace ndn {
-class Face;
-class PendingInterestId;
-}
-
namespace nfd {
using ndn::nfd::FaceEventNotification;
-/**
- * \brief Helper class to subscribe to face notification events
- *
- * \todo Write test cases
- */
-class FaceMonitor : noncopyable
+class FaceMonitor : public NotificationSubscriber<FaceEventNotification>
{
public:
- typedef function<void(const ndn::nfd::FaceEventNotification&)> NotificationCallback;
- typedef function<void()> TimeoutCallback;
-
- typedef std::vector<NotificationCallback> NotificationCallbacks;
- typedef std::vector<TimeoutCallback> TimeoutCallbacks;
-
- explicit
- FaceMonitor(ndn::Face& face);
-
- ~FaceMonitor();
-
- /** \brief Stops all notifications. This method doesn't remove registered callbacks.
- */
- void
- stopNotifications();
-
- /** \brief Resumes notifications for added subscribers.
- */
- void
- startNotifications();
-
- /** \brief Removes all notification subscribers.
- */
- void
- removeAllSubscribers();
-
- /** \brief Adds a notification subscriber. This method doesn't return on timeouts.
- */
- void
- addSubscriber(const NotificationCallback& notificationCallback);
-
- /** \brief Adds a notification subscriber.
- */
- void
- addSubscriber(const NotificationCallback& notificationCallback,
- const TimeoutCallback& timeoutCallback);
-
-private:
- void
- onTimeout();
-
- void
- onNotification(const Data& data);
-
-private:
- ndn::Face& m_face;
- uint64_t m_lastSequence;
- bool m_isStopped;
- NotificationCallbacks m_notificationCallbacks;
- TimeoutCallbacks m_timeoutCallbacks;
- const ndn::PendingInterestId* m_lastInterestId;
+ FaceMonitor(ndn::Face& face)
+ : NotificationSubscriber<FaceEventNotification>(face, "ndn:/localhost/nfd/faces/events")
+ {
+ }
};
} // namespace nfd
diff --git a/core/notification-stream.hpp b/core/notification-stream.hpp
new file mode 100644
index 0000000..9a89b4c
--- /dev/null
+++ b/core/notification-stream.hpp
@@ -0,0 +1,81 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_CORE_NOTIFICATION_STREAM_HPP
+#define NFD_CORE_NOTIFICATION_STREAM_HPP
+
+#include "common.hpp"
+
+#include <ndn-cxx/encoding/encoding-buffer.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+namespace nfd {
+
+/** \brief provides a publisher of Notification Stream
+ * \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
+ */
+template <class FaceBase>
+class NotificationStream : noncopyable
+{
+public:
+ NotificationStream(FaceBase& face, const Name& prefix, ndn::KeyChain& keyChain)
+ : m_face(face)
+ , m_prefix(prefix)
+ , m_keyChain(keyChain)
+ , m_sequenceNo(0)
+ {
+ }
+
+ virtual
+ ~NotificationStream()
+ {
+ }
+
+ template<typename T> void
+ postNotification(const T& notification)
+ {
+ Name dataName = m_prefix;
+ dataName.appendSequenceNumber(m_sequenceNo);
+
+ shared_ptr<Data> data = make_shared<Data>(dataName);
+ data->setContent(notification.wireEncode());
+ data->setFreshnessPeriod(time::seconds(1));
+
+ m_keyChain.sign(*data);
+ m_face.put(*data);
+
+ ++m_sequenceNo;
+ }
+
+private:
+ FaceBase& m_face;
+ const Name m_prefix;
+ ndn::KeyChain& m_keyChain;
+ uint64_t m_sequenceNo;
+};
+
+} // namespace nfd
+
+#endif // NFD_CORE_NOTIFICATION_STREAM_HPP
diff --git a/core/notification-subscriber.hpp b/core/notification-subscriber.hpp
new file mode 100644
index 0000000..c359c82
--- /dev/null
+++ b/core/notification-subscriber.hpp
@@ -0,0 +1,214 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
+#define NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
+
+#include "event-emitter.hpp"
+
+#include <ndn-cxx/face.hpp>
+
+namespace nfd {
+
+/** \brief provides a subscriber of Notification Stream
+ * \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
+ * \tparam T type of Notification item, appears in payload of Data packets
+ */
+template<typename T>
+class NotificationSubscriber : noncopyable
+{
+public:
+ /** \brief construct a NotificationSubscriber
+ * \note The subscriber is not started after construction.
+ * User should add one or more handlers to onNotification, and invoke .start().
+ */
+ NotificationSubscriber(ndn::Face& face, const Name& prefix)
+ : m_face(face)
+ , m_prefix(prefix)
+ , m_isRunning(false)
+ , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
+ , m_lastInterestId(0)
+ {
+ }
+
+ virtual
+ ~NotificationSubscriber()
+ {
+ }
+
+ /** \return InterestLifetime of Interests to retrieve notifications
+ * \details This must be greater than FreshnessPeriod of Notification Data packets,
+ * to ensure correct operation of this subscriber implementation.
+ */
+ static time::milliseconds
+ getInterestLifetime()
+ {
+ return time::milliseconds(60000);
+ }
+
+ bool
+ isRunning() const
+ {
+ return m_isRunning;
+ }
+
+ /** \brief start or resume receiving notifications
+ * \note onNotification must have at least one listener,
+ * otherwise this operation has no effect.
+ */
+ void
+ start()
+ {
+ if (m_isRunning) // already running
+ return;
+ m_isRunning = true;
+
+ this->sendInitialInterest();
+ }
+
+ /** \brief stop receiving notifications
+ */
+ void
+ stop()
+ {
+ if (!m_isRunning) // not running
+ return;
+ m_isRunning = false;
+
+ if (m_lastInterestId != 0)
+ m_face.removePendingInterest(m_lastInterestId);
+ m_lastInterestId = 0;
+ }
+
+public: // subscriptions
+ /** \brief fires when a Notification is received
+ * \note Removing all handlers will cause the subscriber to stop.
+ */
+ EventEmitter<T> onNotification;
+
+ /** \brief fires when no Notification is received within .getInterestLifetime period
+ */
+ EventEmitter<> onTimeout;
+
+ /** \brief fires when a Data packet in the Notification Stream cannot be decoded as T
+ */
+ EventEmitter<Data> onDecodeError;
+
+private:
+ void
+ sendInitialInterest()
+ {
+ if (this->shouldStop())
+ return;
+
+ shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
+ interest->setMustBeFresh(true);
+ interest->setChildSelector(1);
+ interest->setInterestLifetime(getInterestLifetime());
+
+ m_lastInterestId = m_face.expressInterest(*interest,
+ bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
+ bind(&NotificationSubscriber<T>::afterTimeout, this));
+ }
+
+ void
+ sendNextInterest()
+ {
+ if (this->shouldStop())
+ return;
+
+ BOOST_ASSERT(m_lastSequenceNo !=
+ std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
+
+ Name nextName = m_prefix;
+ nextName.appendSequenceNumber(m_lastSequenceNo + 1);
+
+ shared_ptr<Interest> interest = make_shared<Interest>(nextName);
+ interest->setInterestLifetime(getInterestLifetime());
+
+ m_lastInterestId = m_face.expressInterest(*interest,
+ bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
+ bind(&NotificationSubscriber<T>::afterTimeout, this));
+ }
+
+ /** \brief Check if the subscriber is or should be stopped.
+ * \return true if the subscriber is stopped.
+ */
+ bool
+ shouldStop()
+ {
+ if (!m_isRunning)
+ return true;
+ if (onNotification.isEmpty()) {
+ this->stop();
+ return true;
+ }
+ return false;
+ }
+
+ void
+ afterReceiveData(const Data& data)
+ {
+ if (this->shouldStop())
+ return;
+
+ T notification;
+ try {
+ m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
+ notification.wireDecode(data.getContent().blockFromValue());
+ }
+ catch (tlv::Error&) {
+ this->onDecodeError(data);
+ this->sendInitialInterest();
+ return;
+ }
+
+ this->onNotification(notification);
+
+ this->sendNextInterest();
+ }
+
+ void
+ afterTimeout()
+ {
+ if (this->shouldStop())
+ return;
+
+ this->onTimeout();
+
+ this->sendInitialInterest();
+ }
+
+private:
+ ndn::Face& m_face;
+ Name m_prefix;
+ bool m_isRunning;
+ uint64_t m_lastSequenceNo;
+ const ndn::PendingInterestId* m_lastInterestId;
+};
+
+} // namespace nfd
+
+#endif // NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
diff --git a/core/segment-publisher.hpp b/core/segment-publisher.hpp
index b1aa4a9..ccb3426 100644
--- a/core/segment-publisher.hpp
+++ b/core/segment-publisher.hpp
@@ -33,6 +33,9 @@
namespace nfd {
+/** \brief provides a publisher of Status Dataset or other segmented octet stream
+ * \sa http://redmine.named-data.net/projects/nfd/wiki/StatusDataset
+ */
template <class FaceBase>
class SegmentPublisher : noncopyable
{
@@ -59,48 +62,44 @@
void
publish()
{
- Name segmentPrefix(m_prefix);
- segmentPrefix.appendVersion();
-
ndn::EncodingBuffer buffer;
-
generate(buffer);
const uint8_t* rawBuffer = buffer.buf();
const uint8_t* segmentBegin = rawBuffer;
const uint8_t* end = rawBuffer + buffer.size();
+ Name segmentPrefix(m_prefix);
+ segmentPrefix.appendVersion();
+
uint64_t segmentNo = 0;
- do
- {
- const uint8_t* segmentEnd = segmentBegin + getMaxSegmentSize();
- if (segmentEnd > end)
- {
- segmentEnd = end;
- }
-
- Name segmentName(segmentPrefix);
- segmentName.appendSegment(segmentNo);
-
- shared_ptr<Data> data(make_shared<Data>(segmentName));
- data->setContent(segmentBegin, segmentEnd - segmentBegin);
-
- segmentBegin = segmentEnd;
- if (segmentBegin >= end)
- {
- data->setFinalBlockId(segmentName[-1]);
- }
-
- publishSegment(data);
- segmentNo++;
+ do {
+ const uint8_t* segmentEnd = segmentBegin + getMaxSegmentSize();
+ if (segmentEnd > end) {
+ segmentEnd = end;
}
- while (segmentBegin < end);
+
+ Name segmentName(segmentPrefix);
+ segmentName.appendSegment(segmentNo);
+
+ shared_ptr<Data> data = make_shared<Data>(segmentName);
+ data->setContent(segmentBegin, segmentEnd - segmentBegin);
+
+ segmentBegin = segmentEnd;
+ if (segmentBegin >= end) {
+ data->setFinalBlockId(segmentName[-1]);
+ }
+
+ publishSegment(data);
+ ++segmentNo;
+ } while (segmentBegin < end);
}
protected:
-
+ /** \brief In a derived class, write the octets into outBuffer.
+ */
virtual size_t
- generate(ndn::EncodingBuffer& outBuffer) =0;
+ generate(ndn::EncodingBuffer& outBuffer) = 0;
private:
void