face: support permanent persistency in TcpTransport

refs #3167

Change-Id: I217c5f0fe0dfbbd759861ee262920cc03394b0ed
diff --git a/core/common.hpp b/core/common.hpp
index 2f21da0..9f4fcec 100644
--- a/core/common.hpp
+++ b/core/common.hpp
@@ -33,11 +33,13 @@
 #define PUBLIC_WITH_TESTS_ELSE_PROTECTED public
 #define PUBLIC_WITH_TESTS_ELSE_PRIVATE public
 #define PROTECTED_WITH_TESTS_ELSE_PRIVATE protected
+#define FINAL_UNLESS_WITH_TESTS
 #else
 #define VIRTUAL_WITH_TESTS
 #define PUBLIC_WITH_TESTS_ELSE_PROTECTED protected
 #define PUBLIC_WITH_TESTS_ELSE_PRIVATE private
 #define PROTECTED_WITH_TESTS_ELSE_PRIVATE private
+#define FINAL_UNLESS_WITH_TESTS final
 #endif
 
 #include <cstddef>
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
index e775fc0..30e7a94 100644
--- a/daemon/face/stream-transport.hpp
+++ b/daemon/face/stream-transport.hpp
@@ -69,12 +69,24 @@
              size_t nBytesSent);
 
   void
+  startReceive();
+
+  void
   handleReceive(const boost::system::error_code& error,
                 size_t nBytesReceived);
 
   void
   processErrorCode(const boost::system::error_code& error);
 
+  virtual void
+  handleError(const boost::system::error_code& error);
+
+  void
+  resetReceiveBuffer();
+
+  void
+  resetSendQueue();
+
 protected:
   typename protocol::socket m_socket;
 
@@ -92,10 +104,7 @@
   : m_socket(std::move(socket))
   , m_receiveBufferSize(0)
 {
-  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer, ndn::MAX_NDN_PACKET_SIZE),
-                         bind(&StreamTransport<T>::handleReceive, this,
-                              boost::asio::placeholders::error,
-                              boost::asio::placeholders::bytes_transferred));
+  startReceive();
 }
 
 template<class T>
@@ -136,9 +145,7 @@
 {
   NFD_LOG_FACE_TRACE(__func__);
 
-  // clear send queue
-  std::queue<Block> emptyQueue;
-  std::swap(emptyQueue, m_sendQueue);
+  resetSendQueue();
 
   // use the non-throwing variant and ignore errors, if any
   boost::system::error_code error;
@@ -153,6 +160,9 @@
 {
   NFD_LOG_FACE_TRACE(__func__);
 
+  if (getState() != TransportState::UP)
+    return;
+
   bool wasQueueEmpty = m_sendQueue.empty();
   m_sendQueue.push(packet.packet);
 
@@ -189,6 +199,19 @@
 
 template<class T>
 void
+StreamTransport<T>::startReceive()
+{
+  BOOST_ASSERT(getState() == TransportState::UP);
+
+  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
+                                             ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
+                         bind(&StreamTransport<T>::handleReceive, this,
+                              boost::asio::placeholders::error,
+                              boost::asio::placeholders::bytes_transferred));
+}
+
+template<class T>
+void
 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
                                   size_t nBytesReceived)
 {
@@ -231,11 +254,7 @@
     }
   }
 
-  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
-                                             ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
-                         bind(&StreamTransport<T>::handleReceive, this,
-                              boost::asio::placeholders::error,
-                              boost::asio::placeholders::bytes_transferred));
+  startReceive();
 }
 
 template<class T>
@@ -252,6 +271,13 @@
     // transport is shutting down, ignore any errors
     return;
 
+  handleError(error);
+}
+
+template<class T>
+void
+StreamTransport<T>::handleError(const boost::system::error_code& error)
+{
   if (error != boost::asio::error::eof)
     NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
 
@@ -259,6 +285,21 @@
   doClose();
 }
 
+template<class T>
+void
+StreamTransport<T>::resetReceiveBuffer()
+{
+  m_receiveBufferSize = 0;
+}
+
+template<class T>
+void
+StreamTransport<T>::resetSendQueue()
+{
+  std::queue<Block> emptyQueue;
+  std::swap(emptyQueue, m_sendQueue);
+}
+
 } // namespace face
 } // namespace nfd
 
diff --git a/daemon/face/tcp-transport.cpp b/daemon/face/tcp-transport.cpp
index 4af12f9..f217bb0 100644
--- a/daemon/face/tcp-transport.cpp
+++ b/daemon/face/tcp-transport.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2014-2015,  Regents of the University of California,
+ * Copyright (c) 2014-2016,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -28,11 +28,16 @@
 namespace nfd {
 namespace face {
 
-NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, TcpTransport::protocol,
-                                               "TcpTransport");
+NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, TcpTransport::protocol, "TcpTransport");
+
+time::milliseconds TcpTransport::s_initialReconnectWait = time::seconds(1);
+time::milliseconds TcpTransport::s_maxReconnectWait = time::minutes(5);
+float TcpTransport::s_reconnectWaitMultiplier = 2.0f;
 
 TcpTransport::TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency)
   : StreamTransport(std::move(socket))
+  , m_remoteEndpoint(m_socket.remote_endpoint())
+  , m_nextReconnectWait(s_initialReconnectWait)
 {
   this->setLocalUri(FaceUri(m_socket.local_endpoint()));
   this->setRemoteUri(FaceUri(m_socket.remote_endpoint()));
@@ -53,11 +58,108 @@
 void
 TcpTransport::beforeChangePersistency(ndn::nfd::FacePersistency newPersistency)
 {
-  if (newPersistency == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
-    BOOST_THROW_EXCEPTION(
-      std::invalid_argument("TcpTransport does not support FACE_PERSISTENCY_PERMANENT"));
+  // if persistency is changing from permanent to any other value
+  if (this->getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
+    if (this->getState() == TransportState::DOWN) {
+      // non-permanent transport cannot be in DOWN state, so fail hard
+      this->setState(TransportState::FAILED);
+      doClose();
+    }
   }
 }
 
+void
+TcpTransport::handleError(const boost::system::error_code& error)
+{
+  if (this->getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
+    NFD_LOG_FACE_TRACE("TCP socket error: " << error.message());
+    this->setState(TransportState::DOWN);
+
+    // cancel all outstanding operations
+    boost::system::error_code error;
+    m_socket.cancel(error);
+
+    // do this asynchronously because there could be some callbacks still pending
+    getGlobalIoService().post([this] { reconnect(); });
+  }
+  else {
+    StreamTransport::handleError(error);
+  }
+}
+
+void
+TcpTransport::reconnect()
+{
+  NFD_LOG_FACE_TRACE(__func__);
+
+  if (getState() == TransportState::CLOSING ||
+      getState() == TransportState::FAILED ||
+      getState() == TransportState::CLOSED) {
+    // transport is shutting down, don't attempt to reconnect
+    return;
+  }
+
+  BOOST_ASSERT(getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT);
+  BOOST_ASSERT(getState() == TransportState::DOWN);
+
+  // recreate the socket
+  m_socket = protocol::socket(m_socket.get_io_service());
+  this->resetReceiveBuffer();
+  this->resetSendQueue();
+
+  m_reconnectEvent = scheduler::schedule(m_nextReconnectWait,
+                                         [this] { handleReconnectTimeout(); });
+  m_socket.async_connect(m_remoteEndpoint,
+                         [this] (const boost::system::error_code& error) { handleReconnect(error); });
+}
+
+void
+TcpTransport::handleReconnect(const boost::system::error_code& error)
+{
+  if (getState() == TransportState::CLOSING ||
+      getState() == TransportState::FAILED ||
+      getState() == TransportState::CLOSED ||
+      error == boost::asio::error::operation_aborted) {
+    // transport is shutting down, abort the reconnection attempt and ignore any errors
+    return;
+  }
+
+  if (error) {
+    NFD_LOG_FACE_TRACE("Reconnection attempt failed: " << error.message());
+    return;
+  }
+
+  m_reconnectEvent.cancel();
+  m_nextReconnectWait = s_initialReconnectWait;
+
+  this->setLocalUri(FaceUri(m_socket.local_endpoint()));
+  NFD_LOG_FACE_TRACE("TCP connection reestablished");
+  this->setState(TransportState::UP);
+  this->startReceive();
+}
+
+void
+TcpTransport::handleReconnectTimeout()
+{
+  // abort the reconnection attempt
+  boost::system::error_code error;
+  m_socket.close(error);
+
+  // exponentially back off the reconnection timer
+  m_nextReconnectWait =
+      std::min(time::duration_cast<time::milliseconds>(m_nextReconnectWait * s_reconnectWaitMultiplier),
+               s_maxReconnectWait);
+
+  // do this asynchronously because there could be some callbacks still pending
+  getGlobalIoService().post([this] { reconnect(); });
+}
+
+void
+TcpTransport::doClose()
+{
+  m_reconnectEvent.cancel();
+  StreamTransport::doClose();
+}
+
 } // namespace face
 } // namespace nfd
diff --git a/daemon/face/tcp-transport.hpp b/daemon/face/tcp-transport.hpp
index 871d034..83fba82 100644
--- a/daemon/face/tcp-transport.hpp
+++ b/daemon/face/tcp-transport.hpp
@@ -27,22 +27,66 @@
 #define NFD_DAEMON_FACE_TCP_TRANSPORT_HPP
 
 #include "stream-transport.hpp"
+#include "core/scheduler.hpp"
 
 namespace nfd {
 namespace face {
 
 /**
  * \brief A Transport that communicates on a connected TCP socket
+ *
+ * When persistency is set to permanent, whenever the TCP connection is severed, the transport
+ * state is set to DOWN, and the connection is retried periodically with exponential backoff
+ * until it is reestablished
  */
-class TcpTransport final : public StreamTransport<boost::asio::ip::tcp>
+class TcpTransport FINAL_UNLESS_WITH_TESTS : public StreamTransport<boost::asio::ip::tcp>
 {
 public:
-  TcpTransport(protocol::socket&& socket,
-               ndn::nfd::FacePersistency persistency);
+  TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency);
 
 protected:
-  virtual void
+  void
   beforeChangePersistency(ndn::nfd::FacePersistency newPersistency) final;
+
+  void
+  doClose() final;
+
+  void
+  handleError(const boost::system::error_code& error) final;
+
+PROTECTED_WITH_TESTS_ELSE_PRIVATE:
+  VIRTUAL_WITH_TESTS void
+  reconnect();
+
+  VIRTUAL_WITH_TESTS void
+  handleReconnect(const boost::system::error_code& error);
+
+  VIRTUAL_WITH_TESTS void
+  handleReconnectTimeout();
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+  /** \brief how long to wait before the first reconnection attempt after the TCP connection has been severed
+   */
+  static time::milliseconds s_initialReconnectWait;
+
+  /** \brief maximum amount of time to wait before a reconnection attempt
+   */
+  static time::milliseconds s_maxReconnectWait;
+
+  /** \brief multiplier for the exponential backoff of the reconnection timer
+   */
+  static float s_reconnectWaitMultiplier;
+
+private:
+  typename protocol::endpoint m_remoteEndpoint;
+
+  /** \note valid only when persistency is set to permanent
+   */
+  scheduler::ScopedEventId m_reconnectEvent;
+
+  /** \note valid only when persistency is set to permanent
+   */
+  time::milliseconds m_nextReconnectWait;
 };
 
 } // namespace face
diff --git a/docs/doxygen.conf.in b/docs/doxygen.conf.in
index 1406605..c1c1ad3 100644
--- a/docs/doxygen.conf.in
+++ b/docs/doxygen.conf.in
@@ -1927,6 +1927,7 @@
                          PUBLIC_WITH_TESTS_ELSE_PRIVATE=private \
                          PUBLIC_WITH_TESTS_ELSE_PROTECTED=protected \
                          PROTECTED_WITH_TESTS_ELSE_PRIVATE=private \
+                         FINAL_UNLESS_WITH_TESTS=final \
                          VIRTUAL_WITH_TESTS
 
 # If the MACRO_EXPANSION and EXPAND_ONLY_PREDEF tags are set to YES then this
diff --git a/tests/daemon/face/tcp-transport-fixture.hpp b/tests/daemon/face/tcp-transport-fixture.hpp
index 97b195f..08f55b6 100644
--- a/tests/daemon/face/tcp-transport-fixture.hpp
+++ b/tests/daemon/face/tcp-transport-fixture.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2014-2015,  Regents of the University of California,
+ * Copyright (c) 2014-2016,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -53,9 +53,9 @@
   }
 
   void
-  initialize(ip::address address = ip::address_v4::loopback())
+  startAccept(const tcp::endpoint& remoteEp)
   {
-    tcp::endpoint remoteEp(address, 7070);
+    BOOST_REQUIRE(!acceptor.is_open());
     acceptor.open(remoteEp.protocol());
     acceptor.set_option(tcp::acceptor::reuse_address(true));
     acceptor.bind(remoteEp);
@@ -64,6 +64,21 @@
       BOOST_REQUIRE_EQUAL(error, boost::system::errc::success);
       limitedIo.afterOp();
     });
+  }
+
+  void
+  stopAccept()
+  {
+    BOOST_REQUIRE(acceptor.is_open());
+    acceptor.close();
+  }
+
+  void
+  initialize(ip::address address = ip::address_v4::loopback(),
+             ndn::nfd::FacePersistency persistency = ndn::nfd::FACE_PERSISTENCY_PERSISTENT)
+  {
+    tcp::endpoint remoteEp(address, 7070);
+    startAccept(remoteEp);
 
     tcp::socket sock(g_io);
     sock.async_connect(remoteEp, [this] (const boost::system::error_code& error) {
@@ -75,8 +90,7 @@
 
     localEp = sock.local_endpoint();
     face = make_unique<Face>(make_unique<DummyReceiveLinkService>(),
-                             make_unique<TcpTransport>(std::move(sock),
-                                                       ndn::nfd::FACE_PERSISTENCY_PERSISTENT));
+                             make_unique<TcpTransport>(std::move(sock), persistency));
     transport = static_cast<TcpTransport*>(face->getTransport());
     receivedPackets = &static_cast<DummyReceiveLinkService*>(face->getLinkService())->receivedPackets;
 
diff --git a/tests/daemon/face/tcp-transport.t.cpp b/tests/daemon/face/tcp-transport.t.cpp
index fd0d7ae..a881f83 100644
--- a/tests/daemon/face/tcp-transport.t.cpp
+++ b/tests/daemon/face/tcp-transport.t.cpp
@@ -102,6 +102,166 @@
   BOOST_CHECK_EQUAL(transport->getMtu(), MTU_UNLIMITED);
 }
 
+BOOST_AUTO_TEST_CASE(PermanentReconnect)
+{
+  auto address = getTestIp<ip::address_v4>();
+  SKIP_IF_IP_UNAVAILABLE(address);
+  initialize(address, ndn::nfd::FACE_PERSISTENCY_PERMANENT);
+
+  transport->afterStateChange.connectSingleShot([this] (TransportState oldState, TransportState newState) {
+    BOOST_CHECK_EQUAL(oldState, TransportState::UP);
+    BOOST_CHECK_EQUAL(newState, TransportState::DOWN);
+    limitedIo.afterOp();
+  });
+  remoteSocket.close();
+  BOOST_REQUIRE_EQUAL(limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
+
+  transport->afterStateChange.connectSingleShot([this] (TransportState oldState, TransportState newState) {
+    BOOST_CHECK_EQUAL(oldState, TransportState::DOWN);
+    BOOST_CHECK_EQUAL(newState, TransportState::UP);
+    limitedIo.afterOp();
+  });
+  BOOST_REQUIRE_EQUAL(limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
+}
+
+BOOST_AUTO_TEST_CASE(ChangePersistencyFromPermanentWhenDown)
+{
+  // when persistency is changed out of permanent while transport is DOWN,
+  // the transport immediately goes into FAILED state
+
+  auto address = getTestIp<ip::address_v4>();
+  SKIP_IF_IP_UNAVAILABLE(address);
+  initialize(address, ndn::nfd::FACE_PERSISTENCY_PERMANENT);
+
+  transport->afterStateChange.connectSingleShot([this] (TransportState oldState, TransportState newState) {
+    BOOST_CHECK_EQUAL(oldState, TransportState::UP);
+    BOOST_CHECK_EQUAL(newState, TransportState::DOWN);
+    limitedIo.afterOp();
+  });
+  remoteSocket.close();
+  BOOST_REQUIRE_EQUAL(limitedIo.run(1, time::seconds(1)), LimitedIo::EXCEED_OPS);
+
+  bool didStateChange = false;
+  transport->afterStateChange.connectSingleShot(
+    [this, &didStateChange] (TransportState oldState, TransportState newState) {
+      didStateChange = true;
+      BOOST_CHECK_EQUAL(oldState, TransportState::DOWN);
+      BOOST_CHECK_EQUAL(newState, TransportState::FAILED);
+    });
+  transport->setPersistency(ndn::nfd::FACE_PERSISTENCY_PERSISTENT);
+  BOOST_CHECK(didStateChange);
+}
+
+class PermanentTcpTransportReconnectObserver : public TcpTransport
+{
+public:
+  PermanentTcpTransportReconnectObserver(protocol::socket&& socket, LimitedIo& io)
+    : TcpTransport(std::move(socket), ndn::nfd::FACE_PERSISTENCY_PERMANENT)
+    , m_io(io)
+  {
+  }
+
+protected:
+  void
+  reconnect() final
+  {
+    TcpTransport::reconnect();
+    m_io.afterOp();
+  }
+
+  void
+  handleReconnect(const boost::system::error_code& error) final
+  {
+    TcpTransport::handleReconnect(error);
+    m_io.afterOp();
+  }
+
+  void
+  handleReconnectTimeout() final
+  {
+    TcpTransport::handleReconnectTimeout();
+    m_io.afterOp();
+  }
+
+private:
+  LimitedIo& m_io;
+};
+
+static double
+asFloatMilliseconds(const time::nanoseconds& t)
+{
+  return static_cast<double>(t.count()) / 1000000.0;
+}
+
+BOOST_AUTO_TEST_CASE(PermanentReconnectWithExponentialBackoff)
+{
+  auto address = getTestIp<ip::address_v4>();
+  SKIP_IF_IP_UNAVAILABLE(address);
+
+  tcp::endpoint remoteEp(address, 7070);
+  startAccept(remoteEp);
+
+  tcp::socket sock(g_io);
+  sock.async_connect(remoteEp, [this] (const boost::system::error_code& error) {
+    BOOST_REQUIRE_EQUAL(error, boost::system::errc::success);
+    limitedIo.afterOp();
+  });
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(1)), LimitedIo::EXCEED_OPS);
+
+  auto transportObserver = make_unique<PermanentTcpTransportReconnectObserver>(std::move(sock),
+                                                                               std::ref(limitedIo));
+  BOOST_REQUIRE_EQUAL(transportObserver->getState(), TransportState::UP);
+
+  // break the TCP connection
+  stopAccept();
+  remoteSocket.close();
+
+  // measure retry intervals
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(5)), LimitedIo::EXCEED_OPS);
+  auto retryTime1 = time::steady_clock::now();
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::DOWN);
+
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(5)), LimitedIo::EXCEED_OPS);
+  auto retryTime2 = time::steady_clock::now();
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::DOWN);
+
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(5)), LimitedIo::EXCEED_OPS);
+  auto retryTime3 = time::steady_clock::now();
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::DOWN);
+
+  // check that the backoff algorithm works
+  BOOST_CHECK_CLOSE(asFloatMilliseconds(retryTime2 - retryTime1),
+                    asFloatMilliseconds(TcpTransport::s_initialReconnectWait),
+                    10.0);
+  BOOST_CHECK_CLOSE(asFloatMilliseconds(retryTime3 - retryTime2),
+                    asFloatMilliseconds(TcpTransport::s_initialReconnectWait) * TcpTransport::s_reconnectWaitMultiplier,
+                    10.0);
+
+  // reestablish the TCP connection
+  startAccept(remoteEp);
+
+  BOOST_REQUIRE_EQUAL(limitedIo.run(3, time::seconds(10)), LimitedIo::EXCEED_OPS);
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::UP);
+
+  // break the TCP connection again
+  stopAccept();
+  remoteSocket.close();
+
+  // measure retry intervals
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(5)), LimitedIo::EXCEED_OPS);
+  auto retryTime4 = time::steady_clock::now();
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::DOWN);
+
+  BOOST_REQUIRE_EQUAL(limitedIo.run(2, time::seconds(5)), LimitedIo::EXCEED_OPS);
+  auto retryTime5 = time::steady_clock::now();
+  BOOST_CHECK_EQUAL(transportObserver->getState(), TransportState::DOWN);
+
+  // check that the timeout restarts from the initial value after a successful reconnection
+  BOOST_CHECK_CLOSE(asFloatMilliseconds(retryTime5 - retryTime4),
+                    asFloatMilliseconds(TcpTransport::s_initialReconnectWait),
+                    10.0);
+}
+
 BOOST_AUTO_TEST_SUITE_END() // TestTcpTransport
 BOOST_AUTO_TEST_SUITE_END() // Face