face: support permanent persistency in TcpTransport
refs #3167
Change-Id: I217c5f0fe0dfbbd759861ee262920cc03394b0ed
diff --git a/daemon/face/stream-transport.hpp b/daemon/face/stream-transport.hpp
index e775fc0..30e7a94 100644
--- a/daemon/face/stream-transport.hpp
+++ b/daemon/face/stream-transport.hpp
@@ -69,12 +69,24 @@
size_t nBytesSent);
void
+ startReceive();
+
+ void
handleReceive(const boost::system::error_code& error,
size_t nBytesReceived);
void
processErrorCode(const boost::system::error_code& error);
+ virtual void
+ handleError(const boost::system::error_code& error);
+
+ void
+ resetReceiveBuffer();
+
+ void
+ resetSendQueue();
+
protected:
typename protocol::socket m_socket;
@@ -92,10 +104,7 @@
: m_socket(std::move(socket))
, m_receiveBufferSize(0)
{
- m_socket.async_receive(boost::asio::buffer(m_receiveBuffer, ndn::MAX_NDN_PACKET_SIZE),
- bind(&StreamTransport<T>::handleReceive, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ startReceive();
}
template<class T>
@@ -136,9 +145,7 @@
{
NFD_LOG_FACE_TRACE(__func__);
- // clear send queue
- std::queue<Block> emptyQueue;
- std::swap(emptyQueue, m_sendQueue);
+ resetSendQueue();
// use the non-throwing variant and ignore errors, if any
boost::system::error_code error;
@@ -153,6 +160,9 @@
{
NFD_LOG_FACE_TRACE(__func__);
+ if (getState() != TransportState::UP)
+ return;
+
bool wasQueueEmpty = m_sendQueue.empty();
m_sendQueue.push(packet.packet);
@@ -189,6 +199,19 @@
template<class T>
void
+StreamTransport<T>::startReceive()
+{
+ BOOST_ASSERT(getState() == TransportState::UP);
+
+ m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
+ ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
+ bind(&StreamTransport<T>::handleReceive, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+}
+
+template<class T>
+void
StreamTransport<T>::handleReceive(const boost::system::error_code& error,
size_t nBytesReceived)
{
@@ -231,11 +254,7 @@
}
}
- m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
- ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
- bind(&StreamTransport<T>::handleReceive, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ startReceive();
}
template<class T>
@@ -252,6 +271,13 @@
// transport is shutting down, ignore any errors
return;
+ handleError(error);
+}
+
+template<class T>
+void
+StreamTransport<T>::handleError(const boost::system::error_code& error)
+{
if (error != boost::asio::error::eof)
NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
@@ -259,6 +285,21 @@
doClose();
}
+template<class T>
+void
+StreamTransport<T>::resetReceiveBuffer()
+{
+ m_receiveBufferSize = 0;
+}
+
+template<class T>
+void
+StreamTransport<T>::resetSendQueue()
+{
+ std::queue<Block> emptyQueue;
+ std::swap(emptyQueue, m_sendQueue);
+}
+
} // namespace face
} // namespace nfd
diff --git a/daemon/face/tcp-transport.cpp b/daemon/face/tcp-transport.cpp
index 4af12f9..f217bb0 100644
--- a/daemon/face/tcp-transport.cpp
+++ b/daemon/face/tcp-transport.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014-2015, Regents of the University of California,
+ * Copyright (c) 2014-2016, Regents of the University of California,
* Arizona Board of Regents,
* Colorado State University,
* University Pierre & Marie Curie, Sorbonne University,
@@ -28,11 +28,16 @@
namespace nfd {
namespace face {
-NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, TcpTransport::protocol,
- "TcpTransport");
+NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, TcpTransport::protocol, "TcpTransport");
+
+time::milliseconds TcpTransport::s_initialReconnectWait = time::seconds(1);
+time::milliseconds TcpTransport::s_maxReconnectWait = time::minutes(5);
+float TcpTransport::s_reconnectWaitMultiplier = 2.0f;
TcpTransport::TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency)
: StreamTransport(std::move(socket))
+ , m_remoteEndpoint(m_socket.remote_endpoint())
+ , m_nextReconnectWait(s_initialReconnectWait)
{
this->setLocalUri(FaceUri(m_socket.local_endpoint()));
this->setRemoteUri(FaceUri(m_socket.remote_endpoint()));
@@ -53,11 +58,108 @@
void
TcpTransport::beforeChangePersistency(ndn::nfd::FacePersistency newPersistency)
{
- if (newPersistency == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
- BOOST_THROW_EXCEPTION(
- std::invalid_argument("TcpTransport does not support FACE_PERSISTENCY_PERMANENT"));
+ // if persistency is changing from permanent to any other value
+ if (this->getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
+ if (this->getState() == TransportState::DOWN) {
+ // non-permanent transport cannot be in DOWN state, so fail hard
+ this->setState(TransportState::FAILED);
+ doClose();
+ }
}
}
+void
+TcpTransport::handleError(const boost::system::error_code& error)
+{
+ if (this->getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
+ NFD_LOG_FACE_TRACE("TCP socket error: " << error.message());
+ this->setState(TransportState::DOWN);
+
+ // cancel all outstanding operations
+ boost::system::error_code error;
+ m_socket.cancel(error);
+
+ // do this asynchronously because there could be some callbacks still pending
+ getGlobalIoService().post([this] { reconnect(); });
+ }
+ else {
+ StreamTransport::handleError(error);
+ }
+}
+
+void
+TcpTransport::reconnect()
+{
+ NFD_LOG_FACE_TRACE(__func__);
+
+ if (getState() == TransportState::CLOSING ||
+ getState() == TransportState::FAILED ||
+ getState() == TransportState::CLOSED) {
+ // transport is shutting down, don't attempt to reconnect
+ return;
+ }
+
+ BOOST_ASSERT(getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT);
+ BOOST_ASSERT(getState() == TransportState::DOWN);
+
+ // recreate the socket
+ m_socket = protocol::socket(m_socket.get_io_service());
+ this->resetReceiveBuffer();
+ this->resetSendQueue();
+
+ m_reconnectEvent = scheduler::schedule(m_nextReconnectWait,
+ [this] { handleReconnectTimeout(); });
+ m_socket.async_connect(m_remoteEndpoint,
+ [this] (const boost::system::error_code& error) { handleReconnect(error); });
+}
+
+void
+TcpTransport::handleReconnect(const boost::system::error_code& error)
+{
+ if (getState() == TransportState::CLOSING ||
+ getState() == TransportState::FAILED ||
+ getState() == TransportState::CLOSED ||
+ error == boost::asio::error::operation_aborted) {
+ // transport is shutting down, abort the reconnection attempt and ignore any errors
+ return;
+ }
+
+ if (error) {
+ NFD_LOG_FACE_TRACE("Reconnection attempt failed: " << error.message());
+ return;
+ }
+
+ m_reconnectEvent.cancel();
+ m_nextReconnectWait = s_initialReconnectWait;
+
+ this->setLocalUri(FaceUri(m_socket.local_endpoint()));
+ NFD_LOG_FACE_TRACE("TCP connection reestablished");
+ this->setState(TransportState::UP);
+ this->startReceive();
+}
+
+void
+TcpTransport::handleReconnectTimeout()
+{
+ // abort the reconnection attempt
+ boost::system::error_code error;
+ m_socket.close(error);
+
+ // exponentially back off the reconnection timer
+ m_nextReconnectWait =
+ std::min(time::duration_cast<time::milliseconds>(m_nextReconnectWait * s_reconnectWaitMultiplier),
+ s_maxReconnectWait);
+
+ // do this asynchronously because there could be some callbacks still pending
+ getGlobalIoService().post([this] { reconnect(); });
+}
+
+void
+TcpTransport::doClose()
+{
+ m_reconnectEvent.cancel();
+ StreamTransport::doClose();
+}
+
} // namespace face
} // namespace nfd
diff --git a/daemon/face/tcp-transport.hpp b/daemon/face/tcp-transport.hpp
index 871d034..83fba82 100644
--- a/daemon/face/tcp-transport.hpp
+++ b/daemon/face/tcp-transport.hpp
@@ -27,22 +27,66 @@
#define NFD_DAEMON_FACE_TCP_TRANSPORT_HPP
#include "stream-transport.hpp"
+#include "core/scheduler.hpp"
namespace nfd {
namespace face {
/**
* \brief A Transport that communicates on a connected TCP socket
+ *
+ * When persistency is set to permanent, whenever the TCP connection is severed, the transport
+ * state is set to DOWN, and the connection is retried periodically with exponential backoff
+ * until it is reestablished
*/
-class TcpTransport final : public StreamTransport<boost::asio::ip::tcp>
+class TcpTransport FINAL_UNLESS_WITH_TESTS : public StreamTransport<boost::asio::ip::tcp>
{
public:
- TcpTransport(protocol::socket&& socket,
- ndn::nfd::FacePersistency persistency);
+ TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency);
protected:
- virtual void
+ void
beforeChangePersistency(ndn::nfd::FacePersistency newPersistency) final;
+
+ void
+ doClose() final;
+
+ void
+ handleError(const boost::system::error_code& error) final;
+
+PROTECTED_WITH_TESTS_ELSE_PRIVATE:
+ VIRTUAL_WITH_TESTS void
+ reconnect();
+
+ VIRTUAL_WITH_TESTS void
+ handleReconnect(const boost::system::error_code& error);
+
+ VIRTUAL_WITH_TESTS void
+ handleReconnectTimeout();
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ /** \brief how long to wait before the first reconnection attempt after the TCP connection has been severed
+ */
+ static time::milliseconds s_initialReconnectWait;
+
+ /** \brief maximum amount of time to wait before a reconnection attempt
+ */
+ static time::milliseconds s_maxReconnectWait;
+
+ /** \brief multiplier for the exponential backoff of the reconnection timer
+ */
+ static float s_reconnectWaitMultiplier;
+
+private:
+ typename protocol::endpoint m_remoteEndpoint;
+
+ /** \note valid only when persistency is set to permanent
+ */
+ scheduler::ScopedEventId m_reconnectEvent;
+
+ /** \note valid only when persistency is set to permanent
+ */
+ time::milliseconds m_nextReconnectWait;
};
} // namespace face