face: Fix spurious assertion failure in StreamFace
Change-Id: Id487ae8d01ab3dc0ffa647dc69ff594f8a1d3c12
Refs: #1856
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index 8eb7af0..3715719 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -77,10 +77,10 @@
size_t nBytesReceived);
void
- keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
+ shutdownSocket();
void
- closeSocket();
+ deferredClose(const shared_ptr<Face>& face);
protected:
shared_ptr<typename protocol::socket> m_socket;
@@ -138,6 +138,7 @@
{
}
+
template<class Protocol, class FaceBase, class Packet>
struct StreamFaceSenderImpl
{
@@ -191,15 +192,6 @@
template<class T, class U>
inline void
-StreamFace<T, U>::sendFromQueue()
-{
- const Block& block = this->m_sendQueue.front();
- boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
- bind(&StreamFace<T, U>::handleSend, this, _1, _2));
-}
-
-template<class T, class U>
-inline void
StreamFace<T, U>::close()
{
if (!m_socket->is_open())
@@ -209,7 +201,7 @@
<< ",uri:" << this->getRemoteUri()
<< "] Close connection");
- closeSocket();
+ shutdownSocket();
this->fail("Close connection");
}
@@ -217,7 +209,8 @@
inline void
StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
{
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ if (error == boost::asio::error::operation_aborted || // when cancel() is called
+ error == boost::asio::error::shut_down) // after shutdown() is called
return;
if (!m_socket->is_open())
@@ -236,11 +229,11 @@
{
NFD_LOG_WARN("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Send or receive operation failed, closing socket: "
- << error.category().message(error.value()));
+ << "] Send or receive operation failed, closing face: "
+ << error.message());
}
- closeSocket();
+ shutdownSocket();
if (error == boost::asio::error::eof)
{
@@ -248,11 +241,18 @@
}
else
{
- this->fail("Send or receive operation failed, closing socket: " +
- error.category().message(error.value()));
+ this->fail("Send or receive operation failed, closing face: " + error.message());
}
}
+template<class T, class U>
+inline void
+StreamFace<T, U>::sendFromQueue()
+{
+ const Block& block = this->m_sendQueue.front();
+ boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
+ bind(&StreamFace<T, U>::handleSend, this, _1, _2));
+}
template<class T, class U>
inline void
@@ -316,12 +316,11 @@
{
NFD_LOG_WARN("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Failed to parse incoming packet or it is too large to process, "
+ << "] Failed to parse incoming packet or packet too large to process, "
<< "closing down the face");
-
- closeSocket();
- this->fail("Failed to parse incoming packet or it is too large to process, "
- "closing down the face");
+ shutdownSocket();
+ this->fail("Failed to parse incoming packet or packet too large to process, "
+ "closing down the face");
return;
}
@@ -346,30 +345,48 @@
template<class T, class U>
inline void
-StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
+StreamFace<T, U>::shutdownSocket()
{
+ // Cancel all outstanding operations and shutdown the socket
+ // so that no further sends or receives are possible.
+ // Use the non-throwing variants and ignore errors, if any.
+ boost::system::error_code error;
+ m_socket->cancel(error);
+ m_socket->shutdown(protocol::socket::shutdown_both, error);
+
+ boost::asio::io_service& io = m_socket->get_io_service();
+ // ensure that the Face object is alive at least until all pending
+ // handlers are dispatched
+ io.post(bind(&StreamFace<T, U>::deferredClose, this, this->shared_from_this()));
+
+ // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
+ //
+ // When shutdownSocket is called from within a socket event (e.g., from handleReceive),
+ // m_socket->shutdown() does not trigger the cancellation of the handleSend callback.
+ // Instead, handleSend is invoked as nothing bad happened.
+ //
+ // In order to prevent the assertion in handleSend from failing, we clear the queue
+ // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
+ // this point have been executed. If more send operations are scheduled after this
+ // point, they will fail because the socket has been shutdown, and their callbacks
+ // will be invoked with error code == asio::error::shut_down.
}
template<class T, class U>
inline void
-StreamFace<T, U>::closeSocket()
+StreamFace<T, U>::deferredClose(const shared_ptr<Face>& face)
{
- boost::asio::io_service& io = m_socket->get_io_service();
-
- // use the non-throwing variants and ignore errors, if any
- boost::system::error_code error;
- m_socket->shutdown(protocol::socket::shutdown_both, error);
- m_socket->close(error);
- // after this, handlers will be called with an error code
-
- // ensure that the Face object is alive at least until all pending
- // handlers are dispatched
- io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
- this, this->shared_from_this()));
+ NFD_LOG_DEBUG("[id:" << this->getId()
+ << ",uri:" << this->getRemoteUri()
+ << "] Clearing send queue");
// clear send queue
std::queue<Block> emptyQueue;
std::swap(emptyQueue, m_sendQueue);
+
+ // use the non-throwing variant and ignore errors, if any
+ boost::system::error_code error;
+ m_socket->close(error);
}
} // namespace nfd
diff --git a/tests/daemon/face/tcp.cpp b/tests/daemon/face/tcp.cpp
index 80e357d..7b0a77d 100644
--- a/tests/daemon/face/tcp.cpp
+++ b/tests/daemon/face/tcp.cpp
@@ -637,7 +637,6 @@
shared_ptr<Face> face1;
};
-
BOOST_FIXTURE_TEST_CASE(FaceCreateTimeout, FaceCreateTimeoutFixture)
{
TcpFactory factory;
@@ -653,6 +652,50 @@
BOOST_CHECK_EQUAL(static_cast<bool>(face1), false);
}
+BOOST_FIXTURE_TEST_CASE(Bug1856, EndToEndFixture)
+{
+ TcpFactory factory1;
+
+ shared_ptr<TcpChannel> channel1 = factory1.createChannel("127.0.0.1", "20070");
+ factory1.createChannel("127.0.0.1", "20071");
+
+ BOOST_CHECK_EQUAL(channel1->isListening(), false);
+
+ channel1->listen(bind(&EndToEndFixture::channel1_onFaceCreated, this, _1),
+ bind(&EndToEndFixture::channel1_onConnectFailed, this, _1));
+
+ BOOST_CHECK_EQUAL(channel1->isListening(), true);
+
+ TcpFactory factory2;
+
+ shared_ptr<TcpChannel> channel2 = factory2.createChannel("127.0.0.2", "20070");
+ factory2.createChannel("127.0.0.2", "20071");
+
+ factory2.createFace(FaceUri("tcp://127.0.0.1:20070"),
+ bind(&EndToEndFixture::channel2_onFaceCreated, this, _1),
+ bind(&EndToEndFixture::channel2_onConnectFailed, this, _1));
+
+ BOOST_CHECK_MESSAGE(limitedIo.run(2, time::seconds(10)) == LimitedIo::EXCEED_OPS,
+ "TcpChannel error: cannot connect or cannot accept connection");
+
+ BOOST_REQUIRE(static_cast<bool>(face1));
+ BOOST_REQUIRE(static_cast<bool>(face2));
+
+ std::ostringstream hugeName;
+ hugeName << "/huge-name/";
+ for (size_t i = 0; i < MAX_NDN_PACKET_SIZE; i++)
+ hugeName << 'a';
+
+ shared_ptr<Interest> interest = makeInterest("ndn:/KfczhUqVix");
+ shared_ptr<Interest> hugeInterest = makeInterest(hugeName.str());
+
+ face1->sendInterest(*hugeInterest);
+ face2->sendInterest(*interest);
+ face2->sendInterest(*interest);
+
+ limitedIo.run(LimitedIo::UNLIMITED_OPS, time::seconds(1));
+ BOOST_TEST_MESSAGE("Unexpected assertion test passed");
+}
BOOST_AUTO_TEST_SUITE_END()