util: handle NACK in NotificationSubscriber

refs #3662

Change-Id: Ia8023668c912e8ba827922f5c31880f903c362f6
diff --git a/src/util/notification-subscriber.hpp b/src/util/notification-subscriber.hpp
index ca997b7..a102cfa 100644
--- a/src/util/notification-subscriber.hpp
+++ b/src/util/notification-subscriber.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
  *
  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
  *
@@ -51,6 +51,10 @@
 #include "../face.hpp"
 #include "signal.hpp"
 #include "concepts.hpp"
+#include "time.hpp"
+#include "random.hpp"
+#include "scheduler.hpp"
+#include "scheduler-scoped-event-id.hpp"
 #include <boost/concept_check.hpp>
 
 namespace ndn {
@@ -77,7 +81,10 @@
     , m_prefix(prefix)
     , m_isRunning(false)
     , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
-    , m_lastInterestId(0)
+    , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
+    , m_attempts(1)
+    , m_scheduler(face.getIoService())
+    , m_nackEvent(m_scheduler)
     , m_interestLifetime(interestLifetime)
   {
   }
@@ -137,6 +144,10 @@
    */
   signal::Signal<NotificationSubscriber, Notification> onNotification;
 
+  /** \brief fires when a NACK is received
+   */
+  signal::Signal<NotificationSubscriber, lp::Nack> onNack;
+
   /** \brief fires when no Notification is received within .getInterestLifetime period
    */
   signal::Signal<NotificationSubscriber> onTimeout;
@@ -159,6 +170,7 @@
 
     m_lastInterestId = m_face.expressInterest(*interest,
                          bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
+                         bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
                          bind(&NotificationSubscriber<Notification>::afterTimeout, this));
   }
 
@@ -179,6 +191,7 @@
 
     m_lastInterestId = m_face.expressInterest(*interest,
                          bind(&NotificationSubscriber<Notification>::afterReceiveData, this, _2),
+                         bind(&NotificationSubscriber<Notification>::afterReceiveNack, this, _2),
                          bind(&NotificationSubscriber<Notification>::afterTimeout, this));
   }
 
@@ -190,7 +203,7 @@
   {
     if (!m_isRunning)
       return true;
-    if (onNotification.isEmpty()) {
+    if (onNotification.isEmpty() && onNack.isEmpty()) {
       this->stop();
       return true;
     }
@@ -220,6 +233,18 @@
   }
 
   void
+  afterReceiveNack(const lp::Nack& nack)
+  {
+    if (this->shouldStop())
+      return;
+
+    this->onNack(nack);
+
+    time::milliseconds delay = exponentialBackoff(nack);
+    m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
+  }
+
+  void
   afterTimeout()
   {
     if (this->shouldStop())
@@ -230,11 +255,40 @@
     this->sendInitialInterest();
   }
 
+  time::milliseconds
+  exponentialBackoff(lp::Nack nack)
+  {
+    uint64_t nackSequenceNo;
+
+    try {
+      nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
+    }
+    catch (name::Component::Error&) {
+      nackSequenceNo = 0;
+    }
+
+    if (m_lastNackSequenceNo ==  nackSequenceNo) {
+      ++m_attempts;
+    } else {
+      m_attempts = 1;
+    }
+
+    time::milliseconds delayTime =
+      time::milliseconds (static_cast<uint32_t>( pow(2, m_attempts) * 100 + random::generateWord32() % 100));
+
+    m_lastNackSequenceNo = nackSequenceNo;
+    return delayTime;
+  }
+
 private:
   Face& m_face;
   Name m_prefix;
   bool m_isRunning;
   uint64_t m_lastSequenceNo;
+  uint64_t m_lastNackSequenceNo;
+  uint64_t m_attempts;
+  util::scheduler::Scheduler m_scheduler;
+  util::scheduler::ScopedEventId m_nackEvent;
   const PendingInterestId* m_lastInterestId;
   time::milliseconds m_interestLifetime;
 };
diff --git a/tests/unit-tests/util/notification-subscriber.t.cpp b/tests/unit-tests/util/notification-subscriber.t.cpp
index 5037c0a..309afad 100644
--- a/tests/unit-tests/util/notification-subscriber.t.cpp
+++ b/tests/unit-tests/util/notification-subscriber.t.cpp
@@ -29,8 +29,9 @@
 #include "simple-notification.hpp"
 #include "util/dummy-client-face.hpp"
 
-#include "boost-test.hpp"
 #include "../identity-management-time-fixture.hpp"
+#include "../make-interest-data.hpp"
+#include "boost-test.hpp"
 
 namespace ndn {
 namespace util {
@@ -69,6 +70,15 @@
     subscriberFace.receive(data);
   }
 
+  /** \brief deliver a Nack to subscriber
+   */
+  void
+  deliverNack(const Interest& interest, const lp::NackReason& reason)
+  {
+    lp::Nack nack = makeNack(interest, reason);
+    subscriberFace.receive(nack);
+  }
+
   void
   afterNotification(const SimpleNotification& notification)
   {
@@ -76,6 +86,12 @@
   }
 
   void
+  afterNack(const lp::Nack& nack)
+  {
+    lastNack = nack;
+  }
+
+  void
   afterTimeout()
   {
     hasTimeout = true;
@@ -92,6 +108,8 @@
   {
     notificationConn = subscriber.onNotification.connect(
       bind(&NotificationSubscriberFixture::afterNotification, this, _1));
+    nackConn = subscriber.onNack.connect(
+      bind(&NotificationSubscriberFixture::afterNack, this, _1));
     subscriber.onTimeout.connect(
       bind(&NotificationSubscriberFixture::afterTimeout, this));
     subscriber.onDecodeError.connect(
@@ -102,6 +120,7 @@
   disconnectHandlers()
   {
     notificationConn.disconnect();
+    nackConn.disconnect();
   }
 
   /** \return true if subscriberFace has an initial request (first sent Interest)
@@ -143,9 +162,11 @@
   DummyClientFace subscriberFace;
   util::NotificationSubscriber<SimpleNotification> subscriber;
   util::signal::Connection notificationConn;
+  util::signal::Connection nackConn;
   uint64_t nextSendNotificationNo;
   uint64_t lastDeliveredSeqNo;
   SimpleNotification lastNotification;
+  lp::Nack lastNack;
   bool hasTimeout;
   Data lastDecodeErrorData;
 };
@@ -194,6 +215,50 @@
   BOOST_CHECK_EQUAL(this->getRequestSeqNo(), lastDeliveredSeqNo + 1);
 }
 
+BOOST_AUTO_TEST_CASE(Nack)
+{
+  this->connectHandlers();
+  subscriber.start();
+  advanceClocks(time::milliseconds(1));
+
+  // send the first Nack to initial request
+  BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+  Interest interest = subscriberFace.sentInterests[0];
+  subscriberFace.sentInterests.clear();
+  this->deliverNack(interest, lp::NackReason::CONGESTION);
+  advanceClocks(time::milliseconds(1));
+  BOOST_CHECK_EQUAL(lastNack.getReason(), lp::NackReason::CONGESTION);
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+  advanceClocks(time::milliseconds(300));
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+
+  // send the second Nack to initial request
+  BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+  interest = subscriberFace.sentInterests[0];
+  subscriberFace.sentInterests.clear();
+  this->deliverNack(interest, lp::NackReason::CONGESTION);
+  advanceClocks(time::milliseconds(301));
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+  advanceClocks(time::milliseconds(200));
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+
+  // send a notification to initial request
+  subscriberFace.sentInterests.clear();
+  this->deliverNotification("n1");
+  advanceClocks(time::milliseconds(1));
+
+  // send a Nack to subsequent request
+  BOOST_REQUIRE_EQUAL(subscriberFace.sentInterests.size(), 1);
+  interest = subscriberFace.sentInterests[0];
+  subscriberFace.sentInterests.clear();
+  this->deliverNack(interest, lp::NackReason::CONGESTION);
+  advanceClocks(time::milliseconds(1));
+  BOOST_CHECK_EQUAL(lastNack.getReason(), lp::NackReason::CONGESTION);
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), false);
+  advanceClocks(time::milliseconds(300));
+  BOOST_REQUIRE_EQUAL(this->hasInitialRequest(), true);
+}
+
 BOOST_AUTO_TEST_CASE(Timeout)
 {
   this->connectHandlers();