blob: 56dee40881b92851d737817f3ba92afd45c8fcae [file] [log] [blame]
Junxiao Shi18244e82016-12-03 15:42:01 +00001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Davide Pesaventob5f8bcc2017-02-05 17:58:05 -05003 * Copyright (c) 2014-2016 Regents of the University of California,
4 * Arizona Board of Regents,
5 * Colorado State University,
6 * University Pierre & Marie Curie, Sorbonne University,
7 * Washington University in St. Louis,
8 * Beijing Institute of Technology,
9 * The University of Memphis.
Junxiao Shi18244e82016-12-03 15:42:01 +000010 *
11 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12 *
13 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14 * terms of the GNU Lesser General Public License as published by the Free Software
15 * Foundation, either version 3 of the License, or (at your option) any later version.
16 *
17 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20 *
21 * You should have received copies of the GNU General Public License and GNU Lesser
22 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23 * <http://www.gnu.org/licenses/>.
24 *
25 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26 */
27
Junxiao Shi18244e82016-12-03 15:42:01 +000028#include "notification-subscriber.hpp"
29#include "random.hpp"
30
31namespace ndn {
32namespace util {
33
34NotificationSubscriberBase::NotificationSubscriberBase(Face& face, const Name& prefix,
35 time::milliseconds interestLifetime)
36 : m_face(face)
37 , m_prefix(prefix)
38 , m_isRunning(false)
39 , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
40 , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
41 , m_attempts(1)
42 , m_scheduler(face.getIoService())
43 , m_nackEvent(m_scheduler)
44 , m_interestLifetime(interestLifetime)
45{
46}
47
48NotificationSubscriberBase::~NotificationSubscriberBase() = default;
49
50void
51NotificationSubscriberBase::start()
52{
53 if (m_isRunning) // already running
54 return;
55 m_isRunning = true;
56
57 this->sendInitialInterest();
58}
59
60void
61NotificationSubscriberBase::stop()
62{
63 if (!m_isRunning) // not running
64 return;
65 m_isRunning = false;
66
67 if (m_lastInterestId != 0)
68 m_face.removePendingInterest(m_lastInterestId);
69 m_lastInterestId = 0;
70}
71
72void
73NotificationSubscriberBase::sendInitialInterest()
74{
75 if (this->shouldStop())
76 return;
77
78 auto interest = make_shared<Interest>(m_prefix);
79 interest->setMustBeFresh(true);
80 interest->setChildSelector(1);
81 interest->setInterestLifetime(getInterestLifetime());
82
83 m_lastInterestId = m_face.expressInterest(*interest,
84 bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
85 bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
86 bind(&NotificationSubscriberBase::afterTimeout, this));
87}
88
89void
90NotificationSubscriberBase::sendNextInterest()
91{
92 if (this->shouldStop())
93 return;
94
95 BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max()); // overflow or missing initial reply
96
97 Name nextName = m_prefix;
98 nextName.appendSequenceNumber(m_lastSequenceNo + 1);
99
100 auto interest = make_shared<Interest>(nextName);
101 interest->setInterestLifetime(getInterestLifetime());
102
103 m_lastInterestId = m_face.expressInterest(*interest,
104 bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
105 bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
106 bind(&NotificationSubscriberBase::afterTimeout, this));
107}
108
109bool
110NotificationSubscriberBase::shouldStop()
111{
112 if (!m_isRunning)
113 return true;
114 if (!this->hasSubscriber() && onNack.isEmpty()) {
115 this->stop();
116 return true;
117 }
118 return false;
119}
120
121void
122NotificationSubscriberBase::afterReceiveData(const Data& data)
123{
124 if (this->shouldStop())
125 return;
126
127 try {
128 m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
129 }
130 catch (const tlv::Error&) {
131 this->onDecodeError(data);
132 this->sendInitialInterest();
133 return;
134 }
135
136 if (!this->decodeAndDeliver(data)) {
137 this->onDecodeError(data);
138 this->sendInitialInterest();
139 return;
140 }
141
142 this->sendNextInterest();
143}
144
145void
146NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
147{
148 if (this->shouldStop())
149 return;
150
151 this->onNack(nack);
152
153 time::milliseconds delay = exponentialBackoff(nack);
154 m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
155}
156
157void
158NotificationSubscriberBase::afterTimeout()
159{
160 if (this->shouldStop())
161 return;
162
163 this->onTimeout();
164
165 this->sendInitialInterest();
166}
167
168time::milliseconds
169NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
170{
171 uint64_t nackSequenceNo;
172
173 try {
174 nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
175 }
176 catch (const tlv::Error&) {
177 nackSequenceNo = 0;
178 }
179
180 if (m_lastNackSequenceNo == nackSequenceNo) {
181 ++m_attempts;
182 }
183 else {
184 m_attempts = 1;
185 }
186
187 m_lastNackSequenceNo = nackSequenceNo;
188
189 return time::milliseconds(static_cast<uint32_t>(pow(2, m_attempts) * 100 +
190 random::generateWord32() % 100));
191}
192
193} // namespace util
194} // namespace ndn