blob: a102cfa72171d1000435616db8a22076a8c5b956 [file] [log] [blame]
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Teng Liangf8bf1b02016-07-17 11:54:19 -07003 * Copyright (c) 2013-2016 Regents of the University of California.
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -07004 *
5 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6 *
7 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8 * terms of the GNU Lesser General Public License as published by the Free Software
9 * Foundation, either version 3 of the License, or (at your option) any later version.
10 *
11 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14 *
15 * You should have received copies of the GNU General Public License and GNU Lesser
16 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17 * <http://www.gnu.org/licenses/>.
18 *
19 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20 */
21
22/**
23 * Original copyright notice from NFD:
24 *
25 * Copyright (c) 2014, Regents of the University of California,
26 * Arizona Board of Regents,
27 * Colorado State University,
28 * University Pierre & Marie Curie, Sorbonne University,
29 * Washington University in St. Louis,
30 * Beijing Institute of Technology,
31 * The University of Memphis
32 *
33 * This file is part of NFD (Named Data Networking Forwarding Daemon).
34 * See AUTHORS.md for complete list of NFD authors and contributors.
35 *
36 * NFD is free software: you can redistribute it and/or modify it under the terms
37 * of the GNU General Public License as published by the Free Software Foundation,
38 * either version 3 of the License, or (at your option) any later version.
39 *
40 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
41 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
42 * PURPOSE. See the GNU General Public License for more details.
43 *
44 * You should have received a copy of the GNU General Public License along with
45 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
46 */
47
48#ifndef NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
49#define NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP
50
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -070051#include "../face.hpp"
Junxiao Shi728873f2015-01-20 16:01:26 -070052#include "signal.hpp"
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -070053#include "concepts.hpp"
Teng Liangf8bf1b02016-07-17 11:54:19 -070054#include "time.hpp"
55#include "random.hpp"
56#include "scheduler.hpp"
57#include "scheduler-scoped-event-id.hpp"
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -070058#include <boost/concept_check.hpp>
59
60namespace ndn {
61namespace util {
62
63/** \brief provides a subscriber of Notification Stream
64 * \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
65 * \tparam Notification type of Notification item, appears in payload of Data packets
66 */
67template<typename Notification>
68class NotificationSubscriber : noncopyable
69{
70public:
71 BOOST_CONCEPT_ASSERT((boost::DefaultConstructible<Notification>));
72 BOOST_CONCEPT_ASSERT((WireDecodable<Notification>));
73
74 /** \brief construct a NotificationSubscriber
75 * \note The subscriber is not started after construction.
76 * User should add one or more handlers to onNotification, and invoke .start().
77 */
78 NotificationSubscriber(Face& face, const Name& prefix,
79 const time::milliseconds& interestLifetime = time::milliseconds(60000))
80 : m_face(face)
81 , m_prefix(prefix)
82 , m_isRunning(false)
83 , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
Teng Liangf8bf1b02016-07-17 11:54:19 -070084 , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
85 , m_attempts(1)
86 , m_scheduler(face.getIoService())
87 , m_nackEvent(m_scheduler)
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -070088 , m_interestLifetime(interestLifetime)
89 {
90 }
91
92 virtual
93 ~NotificationSubscriber()
94 {
95 }
96
97 /** \return InterestLifetime of Interests to retrieve notifications
98 * \details This must be greater than FreshnessPeriod of Notification Data packets,
99 * to ensure correct operation of this subscriber implementation.
100 */
101 time::milliseconds
102 getInterestLifetime() const
103 {
104 return m_interestLifetime;
105 }
106
107 bool
108 isRunning() const
109 {
110 return m_isRunning;
111 }
112
113 /** \brief start or resume receiving notifications
114 * \note onNotification must have at least one listener,
115 * otherwise this operation has no effect.
116 */
117 void
118 start()
119 {
120 if (m_isRunning) // already running
121 return;
122 m_isRunning = true;
123
124 this->sendInitialInterest();
125 }
126
127 /** \brief stop receiving notifications
128 */
129 void
130 stop()
131 {
132 if (!m_isRunning) // not running
133 return;
134 m_isRunning = false;
135
136 if (m_lastInterestId != 0)
137 m_face.removePendingInterest(m_lastInterestId);
138 m_lastInterestId = 0;
139 }
140
141public: // subscriptions
142 /** \brief fires when a Notification is received
143 * \note Removing all handlers will cause the subscriber to stop.
144 */
Junxiao Shi728873f2015-01-20 16:01:26 -0700145 signal::Signal<NotificationSubscriber, Notification> onNotification;
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700146
Teng Liangf8bf1b02016-07-17 11:54:19 -0700147 /** \brief fires when a NACK is received
148 */
149 signal::Signal<NotificationSubscriber, lp::Nack> onNack;
150
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700151 /** \brief fires when no Notification is received within .getInterestLifetime period
152 */
Junxiao Shi728873f2015-01-20 16:01:26 -0700153 signal::Signal<NotificationSubscriber> onTimeout;
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700154
155 /** \brief fires when a Data packet in the Notification Stream cannot be decoded as Notification
156 */
Junxiao Shi728873f2015-01-20 16:01:26 -0700157 signal::Signal<NotificationSubscriber, Data> onDecodeError;
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700158
159private:
160 void
161 sendInitialInterest()
162 {
163 if (this->shouldStop())
164 return;
165
166 shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
167 interest->setMustBeFresh(true);
168 interest->setChildSelector(1);
169 interest->setInterestLifetime(getInterestLifetime());
170
171 m_lastInterestId = m_face.expressInterest(*interest,
172 bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
Teng Liangf8bf1b02016-07-17 11:54:19 -0700173 bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700174 bind(&NotificationSubscriber<Notification>::afterTimeout, this));
175 }
176
177 void
178 sendNextInterest()
179 {
180 if (this->shouldStop())
181 return;
182
183 BOOST_ASSERT(m_lastSequenceNo !=
184 std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
185
186 Name nextName = m_prefix;
187 nextName.appendSequenceNumber(m_lastSequenceNo + 1);
188
189 shared_ptr<Interest> interest = make_shared<Interest>(nextName);
190 interest->setInterestLifetime(getInterestLifetime());
191
192 m_lastInterestId = m_face.expressInterest(*interest,
193 bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
Teng Liangf8bf1b02016-07-17 11:54:19 -0700194 bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700195 bind(&NotificationSubscriber<Notification>::afterTimeout, this));
196 }
197
198 /** \brief Check if the subscriber is or should be stopped.
199 * \return true if the subscriber is stopped.
200 */
201 bool
202 shouldStop()
203 {
204 if (!m_isRunning)
205 return true;
Teng Liangf8bf1b02016-07-17 11:54:19 -0700206 if (onNotification.isEmpty() && onNack.isEmpty()) {
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700207 this->stop();
208 return true;
209 }
210 return false;
211 }
212
213 void
214 afterReceiveData(const Data& data)
215 {
216 if (this->shouldStop())
217 return;
218
219 Notification notification;
220 try {
221 m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
222 notification.wireDecode(data.getContent().blockFromValue());
223 }
224 catch (tlv::Error&) {
225 this->onDecodeError(data);
226 this->sendInitialInterest();
227 return;
228 }
229
230 this->onNotification(notification);
231
232 this->sendNextInterest();
233 }
234
235 void
Teng Liangf8bf1b02016-07-17 11:54:19 -0700236 afterReceiveNack(const lp::Nack& nack)
237 {
238 if (this->shouldStop())
239 return;
240
241 this->onNack(nack);
242
243 time::milliseconds delay = exponentialBackoff(nack);
244 m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
245 }
246
247 void
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700248 afterTimeout()
249 {
250 if (this->shouldStop())
251 return;
252
253 this->onTimeout();
254
255 this->sendInitialInterest();
256 }
257
Teng Liangf8bf1b02016-07-17 11:54:19 -0700258 time::milliseconds
259 exponentialBackoff(lp::Nack nack)
260 {
261 uint64_t nackSequenceNo;
262
263 try {
264 nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
265 }
266 catch (name::Component::Error&) {
267 nackSequenceNo = 0;
268 }
269
270 if (m_lastNackSequenceNo == nackSequenceNo) {
271 ++m_attempts;
272 } else {
273 m_attempts = 1;
274 }
275
276 time::milliseconds delayTime =
277 time::milliseconds (static_cast<uint32_t>( pow(2, m_attempts) * 100 + random::generateWord32() % 100));
278
279 m_lastNackSequenceNo = nackSequenceNo;
280 return delayTime;
281 }
282
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700283private:
284 Face& m_face;
285 Name m_prefix;
286 bool m_isRunning;
287 uint64_t m_lastSequenceNo;
Teng Liangf8bf1b02016-07-17 11:54:19 -0700288 uint64_t m_lastNackSequenceNo;
289 uint64_t m_attempts;
290 util::scheduler::Scheduler m_scheduler;
291 util::scheduler::ScopedEventId m_nackEvent;
Alexander Afanasyev4abdbf12014-08-11 12:48:54 -0700292 const PendingInterestId* m_lastInterestId;
293 time::milliseconds m_interestLifetime;
294};
295
296} // namespace util
297} // namespace ndn
298
299#endif // NDN_UTIL_NOTIFICATION_SUBSCRIBER_HPP