face: Fix spurious assertion failure in StreamFace
Change-Id: Id487ae8d01ab3dc0ffa647dc69ff594f8a1d3c12
Refs: #1856
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index 8eb7af0..3715719 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -77,10 +77,10 @@
size_t nBytesReceived);
void
- keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
+ shutdownSocket();
void
- closeSocket();
+ deferredClose(const shared_ptr<Face>& face);
protected:
shared_ptr<typename protocol::socket> m_socket;
@@ -138,6 +138,7 @@
{
}
+
template<class Protocol, class FaceBase, class Packet>
struct StreamFaceSenderImpl
{
@@ -191,15 +192,6 @@
template<class T, class U>
inline void
-StreamFace<T, U>::sendFromQueue()
-{
- const Block& block = this->m_sendQueue.front();
- boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
- bind(&StreamFace<T, U>::handleSend, this, _1, _2));
-}
-
-template<class T, class U>
-inline void
StreamFace<T, U>::close()
{
if (!m_socket->is_open())
@@ -209,7 +201,7 @@
<< ",uri:" << this->getRemoteUri()
<< "] Close connection");
- closeSocket();
+ shutdownSocket();
this->fail("Close connection");
}
@@ -217,7 +209,8 @@
inline void
StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
{
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
+ if (error == boost::asio::error::operation_aborted || // when cancel() is called
+ error == boost::asio::error::shut_down) // after shutdown() is called
return;
if (!m_socket->is_open())
@@ -236,11 +229,11 @@
{
NFD_LOG_WARN("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Send or receive operation failed, closing socket: "
- << error.category().message(error.value()));
+ << "] Send or receive operation failed, closing face: "
+ << error.message());
}
- closeSocket();
+ shutdownSocket();
if (error == boost::asio::error::eof)
{
@@ -248,11 +241,18 @@
}
else
{
- this->fail("Send or receive operation failed, closing socket: " +
- error.category().message(error.value()));
+ this->fail("Send or receive operation failed, closing face: " + error.message());
}
}
+template<class T, class U>
+inline void
+StreamFace<T, U>::sendFromQueue()
+{
+ const Block& block = this->m_sendQueue.front();
+ boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
+ bind(&StreamFace<T, U>::handleSend, this, _1, _2));
+}
template<class T, class U>
inline void
@@ -316,12 +316,11 @@
{
NFD_LOG_WARN("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Failed to parse incoming packet or it is too large to process, "
+ << "] Failed to parse incoming packet or packet too large to process, "
<< "closing down the face");
-
- closeSocket();
- this->fail("Failed to parse incoming packet or it is too large to process, "
- "closing down the face");
+ shutdownSocket();
+ this->fail("Failed to parse incoming packet or packet too large to process, "
+ "closing down the face");
return;
}
@@ -346,30 +345,48 @@
template<class T, class U>
inline void
-StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
+StreamFace<T, U>::shutdownSocket()
{
+ // Cancel all outstanding operations and shutdown the socket
+ // so that no further sends or receives are possible.
+ // Use the non-throwing variants and ignore errors, if any.
+ boost::system::error_code error;
+ m_socket->cancel(error);
+ m_socket->shutdown(protocol::socket::shutdown_both, error);
+
+ boost::asio::io_service& io = m_socket->get_io_service();
+ // ensure that the Face object is alive at least until all pending
+ // handlers are dispatched
+ io.post(bind(&StreamFace<T, U>::deferredClose, this, this->shared_from_this()));
+
+ // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
+ //
+ // When shutdownSocket is called from within a socket event (e.g., from handleReceive),
+ // m_socket->shutdown() does not trigger the cancellation of the handleSend callback.
+ // Instead, handleSend is invoked as nothing bad happened.
+ //
+ // In order to prevent the assertion in handleSend from failing, we clear the queue
+ // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
+ // this point have been executed. If more send operations are scheduled after this
+ // point, they will fail because the socket has been shutdown, and their callbacks
+ // will be invoked with error code == asio::error::shut_down.
}
template<class T, class U>
inline void
-StreamFace<T, U>::closeSocket()
+StreamFace<T, U>::deferredClose(const shared_ptr<Face>& face)
{
- boost::asio::io_service& io = m_socket->get_io_service();
-
- // use the non-throwing variants and ignore errors, if any
- boost::system::error_code error;
- m_socket->shutdown(protocol::socket::shutdown_both, error);
- m_socket->close(error);
- // after this, handlers will be called with an error code
-
- // ensure that the Face object is alive at least until all pending
- // handlers are dispatched
- io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
- this, this->shared_from_this()));
+ NFD_LOG_DEBUG("[id:" << this->getId()
+ << ",uri:" << this->getRemoteUri()
+ << "] Clearing send queue");
// clear send queue
std::queue<Block> emptyQueue;
std::swap(emptyQueue, m_sendQueue);
+
+ // use the non-throwing variant and ignore errors, if any
+ boost::system::error_code error;
+ m_socket->close(error);
}
} // namespace nfd