blob: 32d9d6a8bd816c8fae01b311a53e6fbe1e17b26c [file] [log] [blame]
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2013-2014 Regents of the University of California.
4 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22/**
23 * Original copyright notice from NFD:
24 *
25 * Copyright (c) 2014, Regents of the University of California,
26 * Arizona Board of Regents,
27 * Colorado State University,
28 * University Pierre & Marie Curie, Sorbonne University,
29 * Washington University in St. Louis,
30 * Beijing Institute of Technology,
31 * The University of Memphis
32 *
33 * This file is part of NFD (Named Data Networking Forwarding Daemon).
34 * See AUTHORS.md for complete list of NFD authors and contributors.
35 *
36 * NFD is free software: you can redistribute it and/or modify it under the terms
37 * of the GNU General Public License as published by the Free Software Foundation,
38 * either version 3 of the License, or (at your option) any later version.
39 *
40 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
41 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
42 * PURPOSE. See the GNU General Public License for more details.
43 *
44 * You should have received a copy of the GNU General Public License along with
45 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
46 */
47
48#ifndef NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
49#define NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
50
51#include "event-emitter.hpp"
52#include "../face.hpp"
53#include "concepts.hpp"
54#include <boost/concept_check.hpp>
55
56namespace ndn {
57namespace util {
58
59/** \brief provides a subscriber of Notification Stream
60 * \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
61 * \tparam Notification type of Notification item, appears in payload of Data packets
62 */
63template<typename Notification>
64class NotificationSubscriber : noncopyable
65{
66public:
67 BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Notification>));
68 BOOST_CONCEPT_ASSERT((WireDecodable<Notification>));
69
70 /** \brief construct a NotificationSubscriber
71 * \note The subscriber is not started after construction.
72 * User should add one or more handlers to onNotification, and invoke .start().
73 */
74 NotificationSubscriber(Face& face, const Name& prefix,
75 const time::milliseconds& interestLifetime = time::milliseconds(60000))
76 : m_face(face)
77 , m_prefix(prefix)
78 , m_isRunning(false)
79 , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
80 , m_lastInterestId(0)
81 , m_interestLifetime(interestLifetime)
82 {
83 }
84
85 virtual
86 ~NotificationSubscriber()
87 {
88 }
89
90 /** \return InterestLifetime of Interests to retrieve notifications
91 * \details This must be greater than FreshnessPeriod of Notification Data packets,
92 * to ensure correct operation of this subscriber implementation.
93 */
94 time::milliseconds
95 getInterestLifetime() const
96 {
97 return m_interestLifetime;
98 }
99
100 bool
101 isRunning() const
102 {
103 return m_isRunning;
104 }
105
106 /** \brief start or resume receiving notifications
107 * \note onNotification must have at least one listener,
108 * otherwise this operation has no effect.
109 */
110 void
111 start()
112 {
113 if (m_isRunning) // already running
114 return;
115 m_isRunning = true;
116
117 this->sendInitialInterest();
118 }
119
120 /** \brief stop receiving notifications
121 */
122 void
123 stop()
124 {
125 if (!m_isRunning) // not running
126 return;
127 m_isRunning = false;
128
129 if (m_lastInterestId != 0)
130 m_face.removePendingInterest(m_lastInterestId);
131 m_lastInterestId = 0;
132 }
133
134public: // subscriptions
135 /** \brief fires when a Notification is received
136 * \note Removing all handlers will cause the subscriber to stop.
137 */
138 EventEmitter<Notification> onNotification;
139
140 /** \brief fires when no Notification is received within .getInterestLifetime period
141 */
142 EventEmitter<> onTimeout;
143
144 /** \brief fires when a Data packet in the Notification Stream cannot be decoded as Notification
145 */
146 EventEmitter<Data> onDecodeError;
147
148private:
149 void
150 sendInitialInterest()
151 {
152 if (this->shouldStop())
153 return;
154
155 shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
156 interest->setMustBeFresh(true);
157 interest->setChildSelector(1);
158 interest->setInterestLifetime(getInterestLifetime());
159
160 m_lastInterestId = m_face.expressInterest(*interest,
161 bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
162 bind(&NotificationSubscriber<Notification>::afterTimeout, this));
163 }
164
165 void
166 sendNextInterest()
167 {
168 if (this->shouldStop())
169 return;
170
171 BOOST_ASSERT(m_lastSequenceNo !=
172 std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
173
174 Name nextName = m_prefix;
175 nextName.appendSequenceNumber(m_lastSequenceNo + 1);
176
177 shared_ptr<Interest> interest = make_shared<Interest>(nextName);
178 interest->setInterestLifetime(getInterestLifetime());
179
180 m_lastInterestId = m_face.expressInterest(*interest,
181 bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
182 bind(&NotificationSubscriber<Notification>::afterTimeout, this));
183 }
184
185 /** \brief Check if the subscriber is or should be stopped.
186 * \return true if the subscriber is stopped.
187 */
188 bool
189 shouldStop()
190 {
191 if (!m_isRunning)
192 return true;
193 if (onNotification.isEmpty()) {
194 this->stop();
195 return true;
196 }
197 return false;
198 }
199
200 void
201 afterReceiveData(const Data& data)
202 {
203 if (this->shouldStop())
204 return;
205
206 Notification notification;
207 try {
208 m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
209 notification.wireDecode(data.getContent().blockFromValue());
210 }
211 catch (tlv::Error&) {
212 this->onDecodeError(data);
213 this->sendInitialInterest();
214 return;
215 }
216
217 this->onNotification(notification);
218
219 this->sendNextInterest();
220 }
221
222 void
223 afterTimeout()
224 {
225 if (this->shouldStop())
226 return;
227
228 this->onTimeout();
229
230 this->sendInitialInterest();
231 }
232
233private:
234 Face& m_face;
235 Name m_prefix;
236 bool m_isRunning;
237 uint64_t m_lastSequenceNo;
238 const PendingInterestId* m_lastInterestId;
239 time::milliseconds m_interestLifetime;
240};
241
242} // namespace util
243} // namespace ndn
244
245#endif // NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP