util: handle NACK in NotificationSubscriber
refs #3662
Change-Id: Ia8023668c912e8ba827922f5c31880f903c362f6
diff --git a/src/util/notification-subscriber.hpp b/src/util/notification-subscriber.hpp
index ca997b7..a102cfa 100644
--- a/src/util/notification-subscriber.hpp
+++ b/src/util/notification-subscriber.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -51,6 +51,10 @@
#include "../face.hpp"
#include "signal.hpp"
#include "concepts.hpp"
+#include "time.hpp"
+#include "random.hpp"
+#include "scheduler.hpp"
+#include "scheduler-scoped-event-id.hpp"
#include <boost/concept_check.hpp>
namespace ndn {
@@ -77,7 +81,10 @@
, m_prefix(prefix)
, m_isRunning(false)
, m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
- , m_lastInterestId(0)
+ , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
+ , m_attempts(1)
+ , m_scheduler(face.getIoService())
+ , m_nackEvent(m_scheduler)
, m_interestLifetime(interestLifetime)
{
}
@@ -137,6 +144,10 @@
*/
signal::Signal<NotificationSubscriber, Notification> onNotification;
+ /** \brief fires when a NACK is received
+ */
+ signal::Signal<NotificationSubscriber, lp::Nack> onNack;
+
/** \brief fires when no Notification is received within .getInterestLifetime period
*/
signal::Signal<NotificationSubscriber> onTimeout;
@@ -159,6 +170,7 @@
m_lastInterestId = m_face.expressInterest(*interest,
bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
+ bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
bind(&NotificationSubscriber<Notification>::afterTimeout, this));
}
@@ -179,6 +191,7 @@
m_lastInterestId = m_face.expressInterest(*interest,
bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
+ bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
bind(&NotificationSubscriber<Notification>::afterTimeout, this));
}
@@ -190,7 +203,7 @@
{
if (!m_isRunning)
return true;
- if (onNotification.isEmpty()) {
+ if (onNotification.isEmpty() && onNack.isEmpty()) {
this->stop();
return true;
}
@@ -220,6 +233,18 @@
}
void
+ afterReceiveNack(const lp::Nack& nack)
+ {
+ if (this->shouldStop())
+ return;
+
+ this->onNack(nack);
+
+ time::milliseconds delay = exponentialBackoff(nack);
+ m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
+ }
+
+ void
afterTimeout()
{
if (this->shouldStop())
@@ -230,11 +255,40 @@
this->sendInitialInterest();
}
+ time::milliseconds
+ exponentialBackoff(lp::Nack nack)
+ {
+ uint64_t nackSequenceNo;
+
+ try {
+ nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
+ }
+ catch (name::Component::Error&) {
+ nackSequenceNo = 0;
+ }
+
+ if (m_lastNackSequenceNo == nackSequenceNo) {
+ ++m_attempts;
+ } else {
+ m_attempts = 1;
+ }
+
+ time::milliseconds delayTime =
+ time::milliseconds (static_cast<uint32_t>( pow(2, m_attempts) * 100 + random::generateWord32() % 100));
+
+ m_lastNackSequenceNo = nackSequenceNo;
+ return delayTime;
+ }
+
private:
Face& m_face;
Name m_prefix;
bool m_isRunning;
uint64_t m_lastSequenceNo;
+ uint64_t m_lastNackSequenceNo;
+ uint64_t m_attempts;
+ util::scheduler::Scheduler m_scheduler;
+ util::scheduler::ScopedEventId m_nackEvent;
const PendingInterestId* m_lastInterestId;
time::milliseconds m_interestLifetime;
};
diff --git a/tests/unit-tests/util/notification-subscriber.t.cpp b/tests/unit-tests/util/notification-subscriber.t.cpp
index 5037c0a..309afad 100644
--- a/tests/unit-tests/util/notification-subscriber.t.cpp
+++ b/tests/unit-tests/util/notification-subscriber.t.cpp
@@ -29,8 +29,9 @@
#include "simple-notification.hpp"
#include "util/dummy-client-face.hpp"
-#include "boost-test.hpp"
#include "../identity-management-time-fixture.hpp"
+#include "../make-interest-data.hpp"
+#include "boost-test.hpp"
namespace ndn {
namespace util {
@@ -69,6 +70,15 @@
subscriberFace.receive(data);
}
+ /** \brief deliver a Nack to subscriber
+ */
+ void
+ deliverNack(const Interest& interest, const lp::NackReason& reason)
+ {
+ lp::Nack nack = makeNack(interest, reason);
+ subscriberFace.receive(nack);
+ }
+
void
afterNotification(const SimpleNotification& notification)
{
@@ -76,6 +86,12 @@
}
void
+ afterNack(const lp::Nack& nack)
+ {
+ lastNack = nack;
+ }
+
+ void
afterTimeout()
{
hasTimeout = true;
@@ -92,6 +108,8 @@
{
notificationConn = subscriber.onNotification.connect(
bind(&NotificationSubscriberFixture::afterNotification, this, _1));
+ nackConn = subscriber.onNack.connect(
+ bind(&NotificationSubscriberFixture::afterNack, this, _1));
subscriber.onTimeout.connect(
bind(&NotificationSubscriberFixture::afterTimeout, this));
subscriber.onDecodeError.connect(
@@ -102,6 +120,7 @@
disconnectHandlers()
{
notificationConn.disconnect();
+ nackConn.disconnect();
}
/** \return true if subscriberFace has an initial request (first sent Interest)
@@ -143,9 +162,11 @@
DummyClientFace subscriberFace;
util::NotificationSubscriber<SimpleNotification> subscriber;
util::signal::Connection notificationConn;
+ util::signal::Connection nackConn;
uint64_t nextSendNotificationNo;
uint64_t lastDeliveredSeqNo;
SimpleNotification lastNotification;
+ lp::Nack lastNack;
bool hasTimeout;
Data lastDecodeErrorData;
};
@@ -194,6 +215,50 @@
BOOST_CHECK_EQUAL(this->getRequestSeqNo(), lastDeliveredSeqNo + 1);
}
+BOOST_AUTO_TEST_CASE(Nack)
+{
+ this->connectHandlers();
+ subscriber.start();
+ advanceClocks(time::milliseconds(1));
+
+ // send the first Nack to initial request
+ BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+ Interest interest = subscriberFace.sentInterests[0];
+ subscriberFace.sentInterests.clear();
+ this->deliverNack(interest, lp::NackReason::CONGESTION);
+ advanceClocks(time::milliseconds(1));
+ BOOST_CHECK_EQUAL(lastNack.getReason(), lp::NackReason::CONGESTION);
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+ advanceClocks(time::milliseconds(300));
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+
+ // send the second Nack to initial request
+ BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+ interest = subscriberFace.sentInterests[0];
+ subscriberFace.sentInterests.clear();
+ this->deliverNack(interest, lp::NackReason::CONGESTION);
+ advanceClocks(time::milliseconds(301));
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+ advanceClocks(time::milliseconds(200));
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+
+ // send a notification to initial request
+ subscriberFace.sentInterests.clear();
+ this->deliverNotification("n1");
+ advanceClocks(time::milliseconds(1));
+
+ // send a Nack to subsequent request
+ BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+ interest = subscriberFace.sentInterests[0];
+ subscriberFace.sentInterests.clear();
+ this->deliverNack(interest, lp::NackReason::CONGESTION);
+ advanceClocks(time::milliseconds(1));
+ BOOST_CHECK_EQUAL(lastNack.getReason(), lp::NackReason::CONGESTION);
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+ advanceClocks(time::milliseconds(300));
+ BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+}
+
BOOST_AUTO_TEST_CASE(Timeout)
{
this->connectHandlers();