blob: c359c82346d19648f1bb01c32be1b6fcd7c68823 [file] [log] [blame]
Junxiao Shi15b12e72014-08-09 19:56:24 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2014, Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis
10 *
11 * This file is part of NFD (Named Data Networking Forwarding Daemon).
12 * See AUTHORS.md for complete list of NFD authors and contributors.
13 *
14 * NFD is free software: you can redistribute it and/or modify it under the terms
15 * of the GNU General Public License as published by the Free Software Foundation,
16 * either version 3 of the License, or (at your option) any later version.
17 *
18 * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20 * PURPOSE. See the GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License along with
23 * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24 */
25
26#ifndef NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
27#define NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP
28
29#include "event-emitter.hpp"
30
31#include <ndn-cxx/face.hpp>
32
33namespace nfd {
34
35/** \brief provides a subscriber of Notification Stream
36 * \sa http://redmine.named-data.net/projects/nfd/wiki/Notification
37 * \tparam T type of Notification item, appears in payload of Data packets
38 */
39template<typename T>
40class NotificationSubscriber : noncopyable
41{
42public:
43 /** \brief construct a NotificationSubscriber
44 * \note The subscriber is not started after construction.
45 * User should add one or more handlers to onNotification, and invoke .start().
46 */
47 NotificationSubscriber(ndn::Face& face, const Name& prefix)
48 : m_face(face)
49 , m_prefix(prefix)
50 , m_isRunning(false)
51 , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
52 , m_lastInterestId(0)
53 {
54 }
55
56 virtual
57 ~NotificationSubscriber()
58 {
59 }
60
61 /** \return InterestLifetime of Interests to retrieve notifications
62 * \details This must be greater than FreshnessPeriod of Notification Data packets,
63 * to ensure correct operation of this subscriber implementation.
64 */
65 static time::milliseconds
66 getInterestLifetime()
67 {
68 return time::milliseconds(60000);
69 }
70
71 bool
72 isRunning() const
73 {
74 return m_isRunning;
75 }
76
77 /** \brief start or resume receiving notifications
78 * \note onNotification must have at least one listener,
79 * otherwise this operation has no effect.
80 */
81 void
82 start()
83 {
84 if (m_isRunning) // already running
85 return;
86 m_isRunning = true;
87
88 this->sendInitialInterest();
89 }
90
91 /** \brief stop receiving notifications
92 */
93 void
94 stop()
95 {
96 if (!m_isRunning) // not running
97 return;
98 m_isRunning = false;
99
100 if (m_lastInterestId != 0)
101 m_face.removePendingInterest(m_lastInterestId);
102 m_lastInterestId = 0;
103 }
104
105public: // subscriptions
106 /** \brief fires when a Notification is received
107 * \note Removing all handlers will cause the subscriber to stop.
108 */
109 EventEmitter<T> onNotification;
110
111 /** \brief fires when no Notification is received within .getInterestLifetime period
112 */
113 EventEmitter<> onTimeout;
114
115 /** \brief fires when a Data packet in the Notification Stream cannot be decoded as T
116 */
117 EventEmitter<Data> onDecodeError;
118
119private:
120 void
121 sendInitialInterest()
122 {
123 if (this->shouldStop())
124 return;
125
126 shared_ptr<Interest> interest = make_shared<Interest>(m_prefix);
127 interest->setMustBeFresh(true);
128 interest->setChildSelector(1);
129 interest->setInterestLifetime(getInterestLifetime());
130
131 m_lastInterestId = m_face.expressInterest(*interest,
132 bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
133 bind(&NotificationSubscriber<T>::afterTimeout, this));
134 }
135
136 void
137 sendNextInterest()
138 {
139 if (this->shouldStop())
140 return;
141
142 BOOST_ASSERT(m_lastSequenceNo !=
143 std::numeric_limits<uint64_t>::max());// overflow or missing initial reply
144
145 Name nextName = m_prefix;
146 nextName.appendSequenceNumber(m_lastSequenceNo + 1);
147
148 shared_ptr<Interest> interest = make_shared<Interest>(nextName);
149 interest->setInterestLifetime(getInterestLifetime());
150
151 m_lastInterestId = m_face.expressInterest(*interest,
152 bind(&NotificationSubscriber<T>::afterReceiveData, this, _2),
153 bind(&NotificationSubscriber<T>::afterTimeout, this));
154 }
155
156 /** \brief Check if the subscriber is or should be stopped.
157 * \return true if the subscriber is stopped.
158 */
159 bool
160 shouldStop()
161 {
162 if (!m_isRunning)
163 return true;
164 if (onNotification.isEmpty()) {
165 this->stop();
166 return true;
167 }
168 return false;
169 }
170
171 void
172 afterReceiveData(const Data& data)
173 {
174 if (this->shouldStop())
175 return;
176
177 T notification;
178 try {
179 m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
180 notification.wireDecode(data.getContent().blockFromValue());
181 }
182 catch (tlv::Error&) {
183 this->onDecodeError(data);
184 this->sendInitialInterest();
185 return;
186 }
187
188 this->onNotification(notification);
189
190 this->sendNextInterest();
191 }
192
193 void
194 afterTimeout()
195 {
196 if (this->shouldStop())
197 return;
198
199 this->onTimeout();
200
201 this->sendInitialInterest();
202 }
203
204private:
205 ndn::Face& m_face;
206 Name m_prefix;
207 bool m_isRunning;
208 uint64_t m_lastSequenceNo;
209 const ndn::PendingInterestId* m_lastInterestId;
210};
211
212} // namespace nfd
213
214#endif // NFD_CORE_NOTIFICATION_SUBSCRIBER_HPP