face: detect websocket connection failure using ping/pong messages.

Change-Id: I66ef6a3921a56f1bb8e49fb6fdb4fa14b6a379c7
Refs: #1903
diff --git a/daemon/face/websocket-channel.cpp b/daemon/face/websocket-channel.cpp
index 234faff..d6cf64b 100644
--- a/daemon/face/websocket-channel.cpp
+++ b/daemon/face/websocket-channel.cpp
@@ -26,6 +26,8 @@
 #include "websocket-channel.hpp"
 #include "core/face-uri.hpp"
 
+#include <boost/date_time/posix_time/posix_time.hpp>
+
 namespace nfd {
 
 NFD_LOG_INIT("WebSocketChannel");
@@ -35,6 +37,7 @@
 WebSocketChannel::WebSocketChannel(const websocket::Endpoint& localEndpoint)
   : m_localEndpoint(localEndpoint)
   , m_isListening(false)
+  , m_pingInterval(10000)
 {
   // Setup WebSocket server
   m_server.clear_access_channels(websocketpp::log::alevel::all);
@@ -47,6 +50,11 @@
   // Always set SO_REUSEADDR flag
   m_server.set_reuse_addr(true);
 
+  // Detect disconnection using PONG message
+  m_server.set_pong_handler(bind(&WebSocketChannel::handlePong, this, _1, _2));
+  m_server.set_pong_timeout_handler(bind(&WebSocketChannel::handlePongTimeout,
+                                         this, _1, _2));
+
   this->setUri(FaceUri(localEndpoint, "ws"));
 }
 
@@ -55,6 +63,34 @@
 }
 
 void
+WebSocketChannel::setPongTimeout(time::milliseconds timeout)
+{
+  m_server.set_pong_timeout(static_cast<long>(timeout.count()));
+}
+
+void
+WebSocketChannel::handlePongTimeout(websocketpp::connection_hdl hdl, std::string msg)
+{
+  ChannelFaceMap::iterator it = m_channelFaces.find(hdl);
+  if (it != m_channelFaces.end())
+    {
+      it->second->close();
+      NFD_LOG_DEBUG("handlePongTimeout: remove " << it->second->getRemoteUri());
+      m_channelFaces.erase(it);
+    }
+}
+
+void
+WebSocketChannel::handlePong(websocketpp::connection_hdl hdl, std::string msg)
+{
+  ChannelFaceMap::iterator it = m_channelFaces.find(hdl);
+  if (it != m_channelFaces.end())
+    {
+      NFD_LOG_TRACE("handlePong: from " << it->second->getRemoteUri());
+    }
+}
+
+void
 WebSocketChannel::handleMessage(websocketpp::connection_hdl hdl,
                                 websocket::Server::message_ptr msg)
 {
@@ -73,7 +109,7 @@
     {
       remote = "wsclient://" + m_server.get_con_from_hdl(hdl)->get_remote_endpoint();
     }
-  catch (websocketpp::lib::error_code ec)
+  catch (websocketpp::lib::error_code&)
     {
       NFD_LOG_DEBUG("handleOpen: cannot get remote uri");
       websocketpp::lib::error_code ecode;
@@ -83,6 +119,37 @@
                                                               hdl, ref(m_server));
   m_onFaceCreatedCallback(face);
   m_channelFaces[hdl] = face;
+
+  // Schedule PING message
+  EventId pingEvent = scheduler::schedule(m_pingInterval,
+                                          bind(&WebSocketChannel::sendPing, this, hdl));
+  face->setPingEventId(pingEvent);
+}
+
+void
+WebSocketChannel::sendPing(websocketpp::connection_hdl hdl)
+{
+  ChannelFaceMap::iterator it = m_channelFaces.find(hdl);
+  if (it != m_channelFaces.end())
+    {
+      try
+        {
+          m_server.ping(hdl, "NFD-WebSocket");
+        }
+      catch (websocketpp::lib::error_code&)
+        {
+          it->second->close();
+          NFD_LOG_DEBUG("sendPing: failed to ping " << it->second->getRemoteUri());
+          m_channelFaces.erase(it);
+        }
+
+      NFD_LOG_TRACE("sendPing: to " << it->second->getRemoteUri());
+
+      // Schedule next PING message
+      EventId pingEvent = scheduler::schedule(m_pingInterval,
+                                              bind(&WebSocketChannel::sendPing, this, hdl));
+      it->second->setPingEventId(pingEvent);
+    }
 }
 
 void
@@ -92,7 +159,7 @@
   if (it != m_channelFaces.end())
     {
       it->second->close();
-      NFD_LOG_DEBUG("handleClose: remove client");
+      NFD_LOG_DEBUG("handleClose: remove " << it->second->getRemoteUri());
       m_channelFaces.erase(it);
     }
 }
diff --git a/daemon/face/websocket-channel.hpp b/daemon/face/websocket-channel.hpp
index 62c4940..bd5604b 100644
--- a/daemon/face/websocket-channel.hpp
+++ b/daemon/face/websocket-channel.hpp
@@ -88,8 +88,26 @@
   bool
   isListening() const;
 
+  void
+  setPingInterval(time::milliseconds interval)
+  {
+    m_pingInterval = interval;
+  }
+
+  void
+  setPongTimeout(time::milliseconds timeout);
+
 private:
   void
+  sendPing(websocketpp::connection_hdl hdl);
+
+  void
+  handlePong(websocketpp::connection_hdl hdl, std::string msg);
+
+  void
+  handlePongTimeout(websocketpp::connection_hdl hdl, std::string msg);
+
+  void
   handleMessage(websocketpp::connection_hdl hdl, websocket::Server::message_ptr msg);
 
   void
@@ -118,6 +136,7 @@
    */
   bool m_isListening;
 
+  time::milliseconds m_pingInterval;
 };
 
 inline bool
diff --git a/daemon/face/websocket-face.cpp b/daemon/face/websocket-face.cpp
index 79faf73..5b295b3 100644
--- a/daemon/face/websocket-face.cpp
+++ b/daemon/face/websocket-face.cpp
@@ -24,6 +24,7 @@
  **/
 
 #include "websocket-face.hpp"
+#include "core/global-io.hpp"
 
 namespace nfd {
 
@@ -37,9 +38,9 @@
   , m_server(server)
   , m_closed(false)
 {
+  this->setOnDemand(true);
 }
 
-
 void
 WebSocketFace::sendInterest(const Interest& interest)
 {
@@ -86,6 +87,7 @@
   if (m_closed == false)
     {
       m_closed = true;
+      scheduler::cancel(m_pingEventId);
       websocketpp::lib::error_code ecode;
       m_server.close(m_handle, websocketpp::close::status::normal, "closed by nfd", ecode);
 
diff --git a/daemon/face/websocket-face.hpp b/daemon/face/websocket-face.hpp
index 3f2a932..81a1111 100644
--- a/daemon/face/websocket-face.hpp
+++ b/daemon/face/websocket-face.hpp
@@ -28,6 +28,7 @@
 
 #include "face.hpp"
 #include "core/logger.hpp"
+#include "core/scheduler.hpp"
 
 #ifndef HAVE_WEBSOCKET
 #error "Cannot include this file when WebSocket support is not enabled"
@@ -64,11 +65,18 @@
   close();
 
   void
+  setPingEventId(EventId& id)
+  {
+    m_pingEventId = id;
+  }
+
+  void
   handleReceive(const std::string& msg);
 
 private:
   websocketpp::connection_hdl m_handle;
   websocket::Server& m_server;
+  EventId m_pingEventId;
   bool m_closed;
 };
 
diff --git a/tests/daemon/face/websocket.cpp b/tests/daemon/face/websocket.cpp
index 54b4813..65f0124 100644
--- a/tests/daemon/face/websocket.cpp
+++ b/tests/daemon/face/websocket.cpp
@@ -84,7 +84,6 @@
   face1_onReceiveInterest(const Interest& interest)
   {
     face1_receivedInterests.push_back(interest);
-
     limitedIo.afterOp();
   }
 
@@ -92,7 +91,6 @@
   face1_onReceiveData(const Data& data)
   {
     face1_receivedDatas.push_back(data);
-
     limitedIo.afterOp();
   }
 
@@ -122,6 +120,15 @@
     limitedIo.afterOp();
   }
 
+  bool
+  client1_onPing(websocketpp::connection_hdl hdl, std::string msg)
+  {
+    limitedIo.afterOp();
+    // Return false to suppress the pong response,
+    // which will cause timeout in the websocket channel
+    return false;
+  }
+
   void
   client1_sendInterest(const Interest& interest)
   {
@@ -199,6 +206,8 @@
   WebSocketFactory factory1("9696");
 
   shared_ptr<WebSocketChannel> channel1 = factory1.createChannel("127.0.0.1", "20070");
+  channel1->setPingInterval(time::milliseconds(3000));
+  channel1->setPongTimeout(time::milliseconds(1000));
 
   BOOST_CHECK_EQUAL(channel1->isListening(), false);
 
@@ -206,7 +215,6 @@
 
   BOOST_CHECK_EQUAL(channel1->isListening(), true);
 
-  Client client1;
   // Clear all logging info from websocketpp library
   client1.clear_access_channels(websocketpp::log::alevel::all);
 
@@ -215,6 +223,7 @@
   client1.set_close_handler(bind(&EndToEndFixture::client1_onClose, this, _1));
   client1.set_fail_handler(bind(&EndToEndFixture::client1_onFail,   this, _1));
   client1.set_message_handler(bind(&EndToEndFixture::client1_onMessage, this, _1, _2));
+  client1.set_ping_handler(bind(&EndToEndFixture::client1_onPing, this, _1, _2));
 
   websocketpp::lib::error_code ec;
   Client::connection_ptr con = client1.get_connection("ws://127.0.0.1:20070", ec);
@@ -223,7 +232,10 @@
   BOOST_CHECK_MESSAGE(limitedIo.run(2, time::seconds(10)) == LimitedIo::EXCEED_OPS,
                       "WebSocketChannel error: cannot connect or cannot accept connection");
 
+  BOOST_CHECK_EQUAL(channel1->size(), 1);
+
   BOOST_CHECK_EQUAL(face1->getLocalUri().toString(), "ws://127.0.0.1:20070");
+  BOOST_CHECK_EQUAL(face1->isOnDemand(), true);
 
   //BOOST_CHECK_EQUAL(face1->isLocal(), true);
 
@@ -265,6 +277,9 @@
   BOOST_CHECK_EQUAL(counters1.getNOutDatas()    , 1);
   BOOST_CHECK_EQUAL(counters1.getNInBytes(), nBytesReceived);
   BOOST_CHECK_EQUAL(counters1.getNOutBytes(), nBytesSent);
+
+  limitedIo.run(LimitedIo::UNLIMITED_OPS, time::seconds(8));
+  BOOST_CHECK_EQUAL(channel1->size(), 0);
 }
 
 BOOST_AUTO_TEST_SUITE_END()