face: Serializing StreamFace::send(Interest|Data) operations using queue
Change-Id: I254d5e3030c94fac224e81e66bcbc58ffa752c0e
Refs: #1777
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index f9b7cd9..ef28c25 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -66,15 +66,15 @@
processErrorCode(const boost::system::error_code& error);
void
- handleSend(const boost::system::error_code& error,
- const Block& header, const Block& payload);
+ sendFromQueue();
+
void
handleSend(const boost::system::error_code& error,
- const Block& wire);
+ std::size_t nBytesTransferred);
void
handleReceive(const boost::system::error_code& error,
- std::size_t bytes_recvd);
+ std::size_t nBytesReceived);
void
keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
@@ -88,6 +88,7 @@
private:
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
std::size_t m_inputBufferSize;
+ std::queue<Block> m_sendQueue;
friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
@@ -143,10 +144,11 @@
static void
send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
{
- face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
- packet.wireEncode().size()),
- bind(&StreamFace<Protocol, FaceBase>::handleSend,
- &face, _1, packet.wireEncode()));
+ bool wasQueueEmpty = face.m_sendQueue.empty();
+ face.m_sendQueue.push(packet.wireEncode());
+
+ if (wasQueueEmpty)
+ face.sendFromQueue();
}
};
@@ -157,29 +159,16 @@
static void
send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
{
- using namespace boost::asio;
+ bool wasQueueEmpty = face.m_sendQueue.empty();
- if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
+ if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
{
- const Block& payload = packet.wireEncode();
- face.m_socket->async_send(buffer(payload.wire(), payload.size()),
- bind(&StreamFace<Protocol, LocalFace>::handleSend,
- &face, _1, packet.wireEncode()));
+ face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
}
- else
- {
- Block header = face.filterAndEncodeLocalControlHeader(packet);
- const Block& payload = packet.wireEncode();
+ face.m_sendQueue.push(packet.wireEncode());
- std::vector<const_buffer> buffers;
- buffers.reserve(2);
- buffers.push_back(buffer(header.wire(), header.size()));
- buffers.push_back(buffer(payload.wire(), payload.size()));
-
- face.m_socket->async_send(buffers,
- bind(&StreamFace<Protocol, LocalFace>::handleSend,
- &face, _1, header, payload));
- }
+ if (wasQueueEmpty)
+ face.sendFromQueue();
}
};
@@ -202,6 +191,15 @@
template<class T, class U>
inline void
+StreamFace<T, U>::sendFromQueue()
+{
+ const Block& block = this->m_sendQueue.front();
+ boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
+ bind(&StreamFace<T, U>::handleSend, this, _1, _2));
+}
+
+template<class T, class U>
+inline void
StreamFace<T, U>::close()
{
if (!m_socket->is_open())
@@ -259,42 +257,35 @@
template<class T, class U>
inline void
StreamFace<T, U>::handleSend(const boost::system::error_code& error,
- const Block& wire)
+ size_t nBytesTransferred)
{
if (error)
return processErrorCode(error);
- NFD_LOG_TRACE("[id:" << this->getId()
- << ",uri:" << this->getRemoteUri()
- << "] Successfully sent: " << wire.size() << " bytes");
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::handleSend(const boost::system::error_code& error,
- const Block& header, const Block& payload)
-{
- if (error)
- return processErrorCode(error);
+ BOOST_ASSERT(!m_sendQueue.empty());
NFD_LOG_TRACE("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
+ << "] Successfully sent: " << nBytesTransferred << " bytes");
+
+ m_sendQueue.pop();
+ if (!m_sendQueue.empty())
+ sendFromQueue();
}
template<class T, class U>
inline void
StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
- std::size_t bytes_recvd)
+ std::size_t nBytesReceived)
{
if (error)
return processErrorCode(error);
NFD_LOG_TRACE("[id:" << this->getId()
<< ",uri:" << this->getRemoteUri()
- << "] Received: " << bytes_recvd << " bytes");
+ << "] Received: " << nBytesReceived << " bytes");
- m_inputBufferSize += bytes_recvd;
+ m_inputBufferSize += nBytesReceived;
// do magic
std::size_t offset = 0;
@@ -374,6 +365,10 @@
// handlers are dispatched
io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
this, this->shared_from_this()));
+
+ // clear send queue
+ std::queue<Block> emptyQueue;
+ std::swap(emptyQueue, m_sendQueue);
}
} // namespace nfd