face: close idle Datagram faces
refs #1281
Change-Id: Ifc766964f653d3991b55d76361c8b9bbe5d630cf
diff --git a/daemon/face/datagram-face.hpp b/daemon/face/datagram-face.hpp
index e7798b1..4b75a0e 100644
--- a/daemon/face/datagram-face.hpp
+++ b/daemon/face/datagram-face.hpp
@@ -17,9 +17,9 @@
public:
typedef T protocol;
- explicit
DatagramFace(const FaceUri& uri,
- const shared_ptr<typename protocol::socket>& socket);
+ const shared_ptr<typename protocol::socket>& socket,
+ bool isPermanent);
virtual
~DatagramFace();
@@ -42,6 +42,21 @@
handleReceive(const boost::system::error_code& error,
size_t nBytesReceived);
+ void
+ setPermanent(bool isPermanent);
+
+ bool
+ isPermanent() const;
+
+ /**
+ * \brief Set m_hasBeenUsedRecently to false
+ */
+ void
+ resetRecentUsage();
+
+ bool
+ hasBeenUsedRecently() const;
+
protected:
void
@@ -59,6 +74,14 @@
shared_ptr<typename protocol::socket> m_socket;
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+ /**
+ * If false, the face can be closed after it remains unused for a certain
+ * amount of time
+ */
+ bool m_isPermanent;
+
+ bool m_hasBeenUsedRecently;
+
NFD_LOG_INCLASS_DECLARE();
};
@@ -66,9 +89,11 @@
template <class T>
inline
DatagramFace<T>::DatagramFace(const FaceUri& uri,
- const shared_ptr<typename DatagramFace::protocol::socket>& socket)
+ const shared_ptr<typename DatagramFace::protocol::socket>& socket,
+ bool isPermanent)
: Face(uri)
, m_socket(socket)
+ , m_isPermanent(isPermanent)
{
m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
bind(&DatagramFace<T>::handleReceive, this, _1, _2));
@@ -166,8 +191,9 @@
{
NFD_LOG_DEBUG("handleReceive: " << nBytesReceived);
receiveDatagram(m_inputBuffer, nBytesReceived, error);
- m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&DatagramFace<T>::handleReceive, this, _1, _2));
+ if (m_socket->is_open())
+ m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+ bind(&DatagramFace<T>::handleReceive, this, _1, _2));
}
template <class T>
@@ -220,7 +246,7 @@
<< ",endpoint:" << m_socket->local_endpoint()
<< "] Received datagram size and decoded "
<< "element size don't match");
- /// @todo this message should not extend the face lifetime
+ // This message won't extend the face lifetime
return;
}
if (!this->decodeAndDispatchInput(element))
@@ -230,7 +256,7 @@
<< "] Received unrecognized block of type ["
<< element.type() << "]");
// ignore unknown packet and proceed
- /// @todo this message should not extend the face lifetime
+ // This message won't extend the face lifetime
return;
}
}
@@ -238,9 +264,10 @@
NFD_LOG_WARN("[id:" << this->getId()
<< ",endpoint:" << m_socket->local_endpoint()
<< "] Received input is invalid");
- /// @todo this message should not extend the face lifetime
+ // This message won't extend the face lifetime
return;
}
+ m_hasBeenUsedRecently = true;
}
@@ -254,6 +281,7 @@
inline void
DatagramFace<T>::closeSocket()
{
+ NFD_LOG_DEBUG("closeSocket " << m_socket->local_endpoint());
boost::asio::io_service& io = m_socket->get_io_service();
// use the non-throwing variants and ignore errors, if any
@@ -268,6 +296,34 @@
this, this->shared_from_this()));
}
+template <class T>
+inline void
+DatagramFace<T>::setPermanent(bool isPermanent)
+{
+ m_isPermanent = isPermanent;
+}
+
+template <class T>
+inline bool
+DatagramFace<T>::isPermanent() const
+{
+ return m_isPermanent;
+}
+
+template <class T>
+inline void
+DatagramFace<T>::resetRecentUsage()
+{
+ m_hasBeenUsedRecently = false;
+}
+
+template <class T>
+inline bool
+DatagramFace<T>::hasBeenUsedRecently() const
+{
+ return m_hasBeenUsedRecently;
+}
+
} // namespace nfd
#endif // NFD_FACE_DATAGRAM_FACE_HPP
diff --git a/daemon/face/multicast-udp-face.cpp b/daemon/face/multicast-udp-face.cpp
index 49146ac..62b20eb 100644
--- a/daemon/face/multicast-udp-face.cpp
+++ b/daemon/face/multicast-udp-face.cpp
@@ -12,7 +12,7 @@
MulticastUdpFace::MulticastUdpFace(const shared_ptr<MulticastUdpFace::protocol::socket>& socket)
- : DatagramFace<protocol>(FaceUri(socket->local_endpoint()), socket)
+ : DatagramFace<protocol>(FaceUri(socket->local_endpoint()), socket, true)
{
NFD_LOG_DEBUG("Face creation. Multicast group: "
<< m_socket->local_endpoint());
diff --git a/daemon/face/udp-channel.cpp b/daemon/face/udp-channel.cpp
index 24c1833..cedb033 100644
--- a/daemon/face/udp-channel.cpp
+++ b/daemon/face/udp-channel.cpp
@@ -18,6 +18,7 @@
const time::Duration& timeout)
: m_localEndpoint(localEndpoint)
, m_isListening(false)
+ , m_idleFaceTimeout(timeout)
{
/// \todo the reuse_address works as we want in Linux, but in other system could be different.
/// We need to check this
@@ -38,10 +39,15 @@
}
this->setUri(FaceUri(localEndpoint));
+
+ //setting the timeout to close the idle faces
+ m_closeIdleFaceEvent = scheduler::schedule(m_idleFaceTimeout,
+ bind(&UdpChannel::closeIdleFaces, this));
}
UdpChannel::~UdpChannel()
{
+ scheduler::cancel(m_closeIdleFaceEvent);
}
void
@@ -70,6 +76,7 @@
{
ChannelFaceMap::iterator i = m_channelFaces.find(remoteEndpoint);
if (i != m_channelFaces.end()) {
+ i->second->setPermanent(true);
onFaceCreated(i->second);
return;
}
@@ -93,7 +100,7 @@
throw Error("Failed to properly configure the socket. Check the address ("
+ std::string(e.what()) + ")");
}
- createFace(clientSocket, onFaceCreated);
+ createFace(clientSocket, onFaceCreated, true);
}
void
@@ -145,11 +152,12 @@
shared_ptr<UdpFace>
UdpChannel::createFace(const shared_ptr<ip::udp::socket>& socket,
- const FaceCreatedCallback& onFaceCreated)
+ const FaceCreatedCallback& onFaceCreated,
+ bool isPermanent)
{
udp::Endpoint remoteEndpoint = socket->remote_endpoint();
- shared_ptr<UdpFace> face = make_shared<UdpFace>(boost::cref(socket));
+ shared_ptr<UdpFace> face = make_shared<UdpFace>(boost::cref(socket), isPermanent);
face->onFail += bind(&UdpChannel::afterFaceFailed, this, remoteEndpoint);
onFaceCreated(face);
@@ -190,7 +198,9 @@
clientSocket->bind(m_localEndpoint);
clientSocket->connect(m_newRemoteEndpoint);
- face = createFace(clientSocket, onFaceCreatedNewPeerCallback);
+ face = createFace(clientSocket,
+ onFaceCreatedNewPeerCallback,
+ false);
}
//Passing the message to the correspondent face
@@ -204,10 +214,33 @@
}
-void UdpChannel::afterFaceFailed(udp::Endpoint &endpoint)
+void
+UdpChannel::afterFaceFailed(udp::Endpoint &endpoint)
{
NFD_LOG_DEBUG("afterFaceFailed: " << endpoint);
m_channelFaces.erase(endpoint);
}
+void
+UdpChannel::closeIdleFaces()
+{
+ ChannelFaceMap::iterator next = m_channelFaces.begin();
+
+ while (next != m_channelFaces.end()) {
+ ChannelFaceMap::iterator it = next;
+ next++;
+ if (!it->second->isPermanent() &&
+ !it->second->hasBeenUsedRecently()) {
+ //face has been idle since the last time closeIdleFaces
+ //has been called. Going to close it
+ NFD_LOG_DEBUG("Found idle face id: " << it->second->getId());
+ it->second->close();
+ } else {
+ it->second->resetRecentUsage();
+ }
+ }
+ m_closeIdleFaceEvent = scheduler::schedule(m_idleFaceTimeout,
+ bind(&UdpChannel::closeIdleFaces, this));
+}
+
} // namespace nfd
diff --git a/daemon/face/udp-channel.hpp b/daemon/face/udp-channel.hpp
index 91b34cd..81e53ac 100644
--- a/daemon/face/udp-channel.hpp
+++ b/daemon/face/udp-channel.hpp
@@ -10,6 +10,7 @@
#include "channel.hpp"
#include "core/time.hpp"
#include "core/global-io.hpp"
+#include "core/scheduler.hpp"
#include "udp-face.hpp"
namespace nfd {
@@ -95,7 +96,8 @@
private:
shared_ptr<UdpFace>
createFace(const shared_ptr<boost::asio::ip::udp::socket>& socket,
- const FaceCreatedCallback& onFaceCreated);
+ const FaceCreatedCallback& onFaceCreated,
+ bool isPermanent);
void
afterFaceFailed(udp::Endpoint& endpoint);
@@ -114,6 +116,9 @@
const ConnectFailedCallback& onConnectFailed,
const shared_ptr<boost::asio::ip::udp::resolver>& resolver);
+ void
+ closeIdleFaces();
+
private:
udp::Endpoint m_localEndpoint;
@@ -146,6 +151,14 @@
* \brief If true, it means the function listen has already been called
*/
bool m_isListening;
+
+ /**
+ * \brief every time m_idleFaceTimeout expires all the idle (and not permanent)
+ * faces will be removed
+ */
+ time::Duration m_idleFaceTimeout;
+
+ EventId m_closeIdleFaceEvent;
};
diff --git a/daemon/face/udp-face.cpp b/daemon/face/udp-face.cpp
index ac8e797..d9ba1f3 100644
--- a/daemon/face/udp-face.cpp
+++ b/daemon/face/udp-face.cpp
@@ -10,8 +10,11 @@
NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(DatagramFace, UdpFace::protocol, "UdpFace");
-UdpFace::UdpFace(const shared_ptr<UdpFace::protocol::socket>& socket)
- : DatagramFace<protocol>(FaceUri(socket->remote_endpoint()), socket)
+UdpFace::UdpFace(const shared_ptr<UdpFace::protocol::socket>& socket,
+ bool isPermanent)
+ : DatagramFace<protocol>(FaceUri(socket->remote_endpoint()),
+ socket,
+ isPermanent)
{
}
diff --git a/daemon/face/udp-face.hpp b/daemon/face/udp-face.hpp
index 126419f..038c7c3 100644
--- a/daemon/face/udp-face.hpp
+++ b/daemon/face/udp-face.hpp
@@ -21,8 +21,8 @@
public:
typedef boost::asio::ip::udp protocol;
- explicit
- UdpFace(const shared_ptr<protocol::socket>& socket);
+ UdpFace(const shared_ptr<protocol::socket>& socket,
+ bool isPermanent);
//@todo if needed by other datagramFaces, it could be moved to datagram-face.hpp
/**
diff --git a/tests/face/udp.cpp b/tests/face/udp.cpp
index bab2f2b..e161cfa 100644
--- a/tests/face/udp.cpp
+++ b/tests/face/udp.cpp
@@ -170,6 +170,12 @@
channel1_onFaceCreated(const shared_ptr<Face>& newFace)
{
BOOST_CHECK(!static_cast<bool>(m_face1));
+ channel1_onFaceCreatedNoCheck(newFace);
+ }
+
+ void
+ channel1_onFaceCreatedNoCheck(const shared_ptr<Face>& newFace)
+ {
m_face1 = newFace;
m_face1->onReceiveInterest +=
bind(&EndToEndFixture::face1_onReceiveInterest, this, _1);
@@ -823,6 +829,78 @@
BOOST_CHECK_EQUAL(channel2->size(), 0);
}
+
+BOOST_FIXTURE_TEST_CASE(ClosingIdleFace, EndToEndFixture)
+{
+ Interest interest1("ndn:/TpnzGvW9R");
+ Interest interest2("ndn:/QWiIMfj5sL");
+
+ UdpFactory factory;
+ time::Duration idleTimeout = time::seconds(2);
+
+ shared_ptr<UdpChannel> channel1 = factory.createChannel("127.0.0.1",
+ "20070",
+ time::seconds(2));
+ shared_ptr<UdpChannel> channel2 = factory.createChannel("127.0.0.1",
+ "20071",
+ time::seconds(2));
+ channel1->listen(bind(&EndToEndFixture::channel1_onFaceCreated, this, _1),
+ bind(&EndToEndFixture::channel1_onConnectFailed, this, _1));
+
+ channel2->connect("127.0.0.1", "20070",
+ bind(&EndToEndFixture::channel2_onFaceCreated, this, _1),
+ bind(&EndToEndFixture::channel2_onConnectFailed, this, _1));
+
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(1, time::seconds(4)) == LimitedIo::EXCEED_OPS,
+ "UdpChannel error: cannot connect or cannot accept connection");
+
+ m_face2->sendInterest(interest1);
+
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(2,//1 send + 1 listen return
+ time::seconds(1)) == LimitedIo::EXCEED_OPS,
+ "UdpChannel error: cannot send or receive Interest/Data packets");
+
+ BOOST_CHECK_EQUAL(m_faces.size(), 2);
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(1, time::seconds(2)) == LimitedIo::EXCEED_TIME,
+ "Idle face should be still open because has been used recently");
+ BOOST_CHECK_EQUAL(m_faces.size(), 2);
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(1, time::seconds(4)) == LimitedIo::EXCEED_OPS,
+ "Closing idle face error: face should be closed by now");
+
+ //the face on listen should be closed now
+ BOOST_CHECK_EQUAL(channel1->size(), 0);
+ //checking that m_face2 has not been closed
+ BOOST_CHECK_EQUAL(channel2->size(), 1);
+ BOOST_REQUIRE(static_cast<bool>(m_face2));
+
+ m_face2->sendInterest(interest2);
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(2,//1 send + 1 listen return
+ time::seconds(1)) == LimitedIo::EXCEED_OPS,
+ "UdpChannel error: cannot send or receive Interest/Data packets");
+ //channel1 should have created a new face by now
+ BOOST_CHECK_EQUAL(channel1->size(), 1);
+ BOOST_CHECK_EQUAL(channel2->size(), 1);
+ BOOST_REQUIRE(static_cast<bool>(m_face1));
+
+ channel1->connect("127.0.0.1", "20071",
+ bind(&EndToEndFixture::channel1_onFaceCreatedNoCheck, this, _1),
+ bind(&EndToEndFixture::channel1_onConnectFailed, this, _1));
+
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(1,//1 connect
+ time::seconds(1)) == LimitedIo::EXCEED_OPS,
+ "UdpChannel error: cannot connect");
+
+ //the connect should have set m_face1 as permanent face,
+ //but it shouln't have created any additional faces
+ BOOST_CHECK_EQUAL(channel1->size(), 1);
+ BOOST_CHECK_EQUAL(channel2->size(), 1);
+ BOOST_CHECK_MESSAGE(m_limitedIo.run(1, time::seconds(4)) == LimitedIo::EXCEED_TIME,
+ "Idle face should be still open because it's permanent now");
+ //both faces are permanent, nothing should have changed
+ BOOST_CHECK_EQUAL(channel1->size(), 1);
+ BOOST_CHECK_EQUAL(channel2->size(), 1);
+}
+
//BOOST_FIXTURE_TEST_CASE(MulticastFace, EndToEndFixture)
//{