transport: Serialize socket write operations
Change-Id: Ieccaf0ccbd6ee8bd08b1eee77d781edd47b2cbb2
Refs: #1769, #1775
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 04aeac3..491f77a 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -38,6 +38,9 @@
typedef Protocol protocol;
typedef StreamTransportImpl<BaseTransport,Protocol> impl;
+ typedef std::list<Block> BlockSequence;
+ typedef std::list<BlockSequence> TransmissionQueue;
+
StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
: m_transport(transport)
, m_socket(ioService)
@@ -58,23 +61,11 @@
resume();
m_transport.m_isConnected = true;
- for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
- m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
- bind(&impl::handle_async_send, this, _1, *i));
-
- for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
- i != m_sendPairQueue.end(); ++i)
- {
- std::vector<boost::asio::const_buffer> buffer;
- buffer.reserve(2);
- buffer.push_back(boost::asio::buffer(i->first.wire(), i->first.size()));
- buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
- m_socket.async_send(buffer,
- bind(&impl::handle_async_send2, this, _1, i->first, i->second));
- }
-
- m_sendQueue.clear();
- m_sendPairQueue.clear();
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
}
else
{
@@ -124,8 +115,7 @@
m_transport.m_isConnected = false;
m_transport.m_isExpectingData = false;
- m_sendQueue.clear();
- m_sendPairQueue.clear();
+ m_transmissionQueue.clear();
}
void
@@ -159,30 +149,60 @@
void
send(const Block& wire)
{
- if (!m_transport.m_isConnected)
- m_sendQueue.push_back(wire);
- else
- m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
- bind(&impl::handle_async_send, this, _1, wire));
+ BlockSequence sequence;
+ sequence.push_back(wire);
+ m_transmissionQueue.push_back(sequence);
+
+ if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+
+ // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+ // next write will be scheduled either in connectHandler or in asyncWriteHandler
}
void
send(const Block& header, const Block& payload)
{
- if (!m_transport.m_isConnected)
- {
- m_sendPairQueue.push_back(std::make_pair(header, payload));
- }
- else
- {
- std::vector<boost::asio::const_buffer> buffers;
- buffers.reserve(2);
- buffers.push_back(boost::asio::buffer(header.wire(), header.size()));
- buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
+ BlockSequence sequence;
+ sequence.push_back(header);
+ sequence.push_back(payload);
+ m_transmissionQueue.push_back(sequence);
- m_socket.async_send(buffers,
- bind(&impl::handle_async_send2, this, _1, header, payload));
+ if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+
+ // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+ // next write will be scheduled either in connectHandler or in asyncWriteHandler
+ }
+
+ void
+ handleAsyncWrite(const boost::system::error_code& error,
+ TransmissionQueue::iterator queueItem)
+ {
+ if (error)
+ {
+ if (error == boost::system::errc::operation_canceled) {
+ // async receive has been explicitly cancelled (e.g., socket close)
+ return;
+ }
+
+ m_transport.close();
+ throw Transport::Error(error, "error while sending data to socket");
}
+
+ m_transmissionQueue.erase(queueItem);
+
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
}
inline bool
@@ -246,19 +266,6 @@
bind(&impl::handle_async_receive, this, _1, _2));
}
- void
- handle_async_send(const boost::system::error_code& error, const Block& wire)
- {
- // pass (needed to keep data block alive during the send)
- }
-
- void
- handle_async_send2(const boost::system::error_code& error,
- const Block& header, const Block& payload)
- {
- // pass (needed to keep data blocks alive during the send)
- }
-
protected:
base_transport& m_transport;
@@ -266,8 +273,7 @@
uint8_t m_inputBuffer[MAX_LENGTH];
size_t m_inputBufferSize;
- std::list< Block > m_sendQueue;
- std::list< std::pair<Block, Block> > m_sendPairQueue;
+ TransmissionQueue m_transmissionQueue;
bool m_connectionInProgress;
boost::asio::deadline_timer m_connectTimer;
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 6fab8e0..7ffe530 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -53,18 +53,21 @@
void
TcpTransport::send(const Block& wire)
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->send(wire);
}
void
TcpTransport::send(const Block& header, const Block& payload)
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->send(header, payload);
}
void
TcpTransport::close()
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->close();
m_impl.reset();
}
@@ -72,12 +75,15 @@
void
TcpTransport::pause()
{
- m_impl->pause();
+ if (static_cast<bool>(m_impl)) {
+ m_impl->pause();
+ }
}
void
TcpTransport::resume()
{
+ BOOST_ASSERT(static_cast<bool>(m_impl));
m_impl->resume();
}
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index cbfada8..03ee8b6 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -117,8 +117,9 @@
void
UnixTransport::pause()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
- m_impl->pause();
+ if (static_cast<bool>(m_impl)) {
+ m_impl->pause();
+ }
}
void