face: Fix spurious assertion failure in StreamFace

Change-Id: Id487ae8d01ab3dc0ffa647dc69ff594f8a1d3c12
Refs: #1856
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index 8eb7af0..3715719 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -77,10 +77,10 @@
                 size_t nBytesReceived);
 
   void
-  keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
+  shutdownSocket();
 
   void
-  closeSocket();
+  deferredClose(const shared_ptr<Face>& face);
 
 protected:
   shared_ptr<typename protocol::socket> m_socket;
@@ -138,6 +138,7 @@
 {
 }
 
+
 template<class Protocol, class FaceBase, class Packet>
 struct StreamFaceSenderImpl
 {
@@ -191,15 +192,6 @@
 
 template<class T, class U>
 inline void
-StreamFace<T, U>::sendFromQueue()
-{
-  const Block& block = this->m_sendQueue.front();
-  boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
-                           bind(&StreamFace<T, U>::handleSend, this, _1, _2));
-}
-
-template<class T, class U>
-inline void
 StreamFace<T, U>::close()
 {
   if (!m_socket->is_open())
@@ -209,7 +201,7 @@
                << ",uri:" << this->getRemoteUri()
                << "] Close connection");
 
-  closeSocket();
+  shutdownSocket();
   this->fail("Close connection");
 }
 
@@ -217,7 +209,8 @@
 inline void
 StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
 {
-  if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+  if (error == boost::asio::error::operation_aborted ||   // when cancel() is called
+      error == boost::asio::error::shut_down)             // after shutdown() is called
     return;
 
   if (!m_socket->is_open())
@@ -236,11 +229,11 @@
     {
       NFD_LOG_WARN("[id:" << this->getId()
                    << ",uri:" << this->getRemoteUri()
-                   << "] Send or receive operation failed, closing socket: "
-                   << error.category().message(error.value()));
+                   << "] Send or receive operation failed, closing face: "
+                   << error.message());
     }
 
-  closeSocket();
+  shutdownSocket();
 
   if (error == boost::asio::error::eof)
     {
@@ -248,11 +241,18 @@
     }
   else
     {
-      this->fail("Send or receive operation failed, closing socket: " +
-                   error.category().message(error.value()));
+      this->fail("Send or receive operation failed, closing face: " + error.message());
     }
 }
 
+template<class T, class U>
+inline void
+StreamFace<T, U>::sendFromQueue()
+{
+  const Block& block = this->m_sendQueue.front();
+  boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
+                           bind(&StreamFace<T, U>::handleSend, this, _1, _2));
+}
 
 template<class T, class U>
 inline void
@@ -316,12 +316,11 @@
     {
       NFD_LOG_WARN("[id:" << this->getId()
                    << ",uri:" << this->getRemoteUri()
-                   << "] Failed to parse incoming packet or it is too large to process, "
+                   << "] Failed to parse incoming packet or packet too large to process, "
                    << "closing down the face");
-
-      closeSocket();
-      this->fail("Failed to parse incoming packet or it is too large to process, "
-                   "closing down the face");
+      shutdownSocket();
+      this->fail("Failed to parse incoming packet or packet too large to process, "
+                 "closing down the face");
       return;
     }
 
@@ -346,30 +345,48 @@
 
 template<class T, class U>
 inline void
-StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
+StreamFace<T, U>::shutdownSocket()
 {
+  // Cancel all outstanding operations and shutdown the socket
+  // so that no further sends or receives are possible.
+  // Use the non-throwing variants and ignore errors, if any.
+  boost::system::error_code error;
+  m_socket->cancel(error);
+  m_socket->shutdown(protocol::socket::shutdown_both, error);
+
+  boost::asio::io_service& io = m_socket->get_io_service();
+  // ensure that the Face object is alive at least until all pending
+  // handlers are dispatched
+  io.post(bind(&StreamFace<T, U>::deferredClose, this, this->shared_from_this()));
+
+  // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
+  //
+  // When shutdownSocket is called from within a socket event (e.g., from handleReceive),
+  // m_socket->shutdown() does not trigger the cancellation of the handleSend callback.
+  // Instead, handleSend is invoked as nothing bad happened.
+  //
+  // In order to prevent the assertion in handleSend from failing, we clear the queue
+  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
+  // this point have been executed.  If more send operations are scheduled after this
+  // point, they will fail because the socket has been shutdown, and their callbacks
+  // will be invoked with error code == asio::error::shut_down.
 }
 
 template<class T, class U>
 inline void
-StreamFace<T, U>::closeSocket()
+StreamFace<T, U>::deferredClose(const shared_ptr<Face>& face)
 {
-  boost::asio::io_service& io = m_socket->get_io_service();
-
-  // use the non-throwing variants and ignore errors, if any
-  boost::system::error_code error;
-  m_socket->shutdown(protocol::socket::shutdown_both, error);
-  m_socket->close(error);
-  // after this, handlers will be called with an error code
-
-  // ensure that the Face object is alive at least until all pending
-  // handlers are dispatched
-  io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
-               this, this->shared_from_this()));
+  NFD_LOG_DEBUG("[id:" << this->getId()
+                << ",uri:" << this->getRemoteUri()
+                << "] Clearing send queue");
 
   // clear send queue
   std::queue<Block> emptyQueue;
   std::swap(emptyQueue, m_sendQueue);
+
+  // use the non-throwing variant and ignore errors, if any
+  boost::system::error_code error;
+  m_socket->close(error);
 }
 
 } // namespace nfd
diff --git a/tests/daemon/face/tcp.cpp b/tests/daemon/face/tcp.cpp
index 80e357d..7b0a77d 100644
--- a/tests/daemon/face/tcp.cpp
+++ b/tests/daemon/face/tcp.cpp
@@ -637,7 +637,6 @@
   shared_ptr<Face> face1;
 };
 
-
 BOOST_FIXTURE_TEST_CASE(FaceCreateTimeout, FaceCreateTimeoutFixture)
 {
   TcpFactory factory;
@@ -653,6 +652,50 @@
   BOOST_CHECK_EQUAL(static_cast<bool>(face1), false);
 }
 
+BOOST_FIXTURE_TEST_CASE(Bug1856, EndToEndFixture)
+{
+  TcpFactory factory1;
+
+  shared_ptr<TcpChannel> channel1 = factory1.createChannel("127.0.0.1", "20070");
+  factory1.createChannel("127.0.0.1", "20071");
+
+  BOOST_CHECK_EQUAL(channel1->isListening(), false);
+
+  channel1->listen(bind(&EndToEndFixture::channel1_onFaceCreated,   this, _1),
+                   bind(&EndToEndFixture::channel1_onConnectFailed, this, _1));
+
+  BOOST_CHECK_EQUAL(channel1->isListening(), true);
+
+  TcpFactory factory2;
+
+  shared_ptr<TcpChannel> channel2 = factory2.createChannel("127.0.0.2", "20070");
+  factory2.createChannel("127.0.0.2", "20071");
+
+  factory2.createFace(FaceUri("tcp://127.0.0.1:20070"),
+                      bind(&EndToEndFixture::channel2_onFaceCreated, this, _1),
+                      bind(&EndToEndFixture::channel2_onConnectFailed, this, _1));
+
+  BOOST_CHECK_MESSAGE(limitedIo.run(2, time::seconds(10)) == LimitedIo::EXCEED_OPS,
+                      "TcpChannel error: cannot connect or cannot accept connection");
+
+  BOOST_REQUIRE(static_cast<bool>(face1));
+  BOOST_REQUIRE(static_cast<bool>(face2));
+
+  std::ostringstream hugeName;
+  hugeName << "/huge-name/";
+  for (size_t i = 0; i < MAX_NDN_PACKET_SIZE; i++)
+    hugeName << 'a';
+
+  shared_ptr<Interest> interest = makeInterest("ndn:/KfczhUqVix");
+  shared_ptr<Interest> hugeInterest = makeInterest(hugeName.str());
+
+  face1->sendInterest(*hugeInterest);
+  face2->sendInterest(*interest);
+  face2->sendInterest(*interest);
+
+  limitedIo.run(LimitedIo::UNLIMITED_OPS, time::seconds(1));
+  BOOST_TEST_MESSAGE("Unexpected assertion test passed");
+}
 
 BOOST_AUTO_TEST_SUITE_END()