transport: Serialize socket write operations

Change-Id: Ieccaf0ccbd6ee8bd08b1eee77d781edd47b2cbb2
Refs: #1769, #1775
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 04aeac3..491f77a 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -38,6 +38,9 @@
   typedef Protocol      protocol;
   typedef StreamTransportImpl<BaseTransport,Protocol> impl;
 
+  typedef std::list<Block> BlockSequence;
+  typedef std::list<BlockSequence> TransmissionQueue;
+
   StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
     : m_transport(transport)
     , m_socket(ioService)
@@ -58,23 +61,11 @@
         resume();
         m_transport.m_isConnected = true;
 
-        for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
-          m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
-                              bind(&impl::handle_async_send, this, _1, *i));
-
-        for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
-             i != m_sendPairQueue.end(); ++i)
-          {
-            std::vector<boost::asio::const_buffer> buffer;
-            buffer.reserve(2);
-            buffer.push_back(boost::asio::buffer(i->first.wire(),  i->first.size()));
-            buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
-            m_socket.async_send(buffer,
-                                bind(&impl::handle_async_send2, this, _1, i->first, i->second));
-          }
-
-        m_sendQueue.clear();
-        m_sendPairQueue.clear();
+        if (!m_transmissionQueue.empty()) {
+          boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                                   bind(&impl::handleAsyncWrite, this, _1,
+                                        m_transmissionQueue.begin()));
+        }
       }
     else
       {
@@ -124,8 +115,7 @@
 
     m_transport.m_isConnected = false;
     m_transport.m_isExpectingData = false;
-    m_sendQueue.clear();
-    m_sendPairQueue.clear();
+    m_transmissionQueue.clear();
   }
 
   void
@@ -159,30 +149,60 @@
   void
   send(const Block& wire)
   {
-    if (!m_transport.m_isConnected)
-      m_sendQueue.push_back(wire);
-    else
-      m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
-                         bind(&impl::handle_async_send, this, _1, wire));
+    BlockSequence sequence;
+    sequence.push_back(wire);
+    m_transmissionQueue.push_back(sequence);
+
+    if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+
+    // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+    // next write will be scheduled either in connectHandler or in asyncWriteHandler
   }
 
   void
   send(const Block& header, const Block& payload)
   {
-    if (!m_transport.m_isConnected)
-      {
-        m_sendPairQueue.push_back(std::make_pair(header, payload));
-      }
-    else
-      {
-        std::vector<boost::asio::const_buffer> buffers;
-        buffers.reserve(2);
-        buffers.push_back(boost::asio::buffer(header.wire(),  header.size()));
-        buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
+    BlockSequence sequence;
+    sequence.push_back(header);
+    sequence.push_back(payload);
+    m_transmissionQueue.push_back(sequence);
 
-        m_socket.async_send(buffers,
-                            bind(&impl::handle_async_send2, this, _1, header, payload));
+    if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+
+    // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+    // next write will be scheduled either in connectHandler or in asyncWriteHandler
+  }
+
+  void
+  handleAsyncWrite(const boost::system::error_code& error,
+                   TransmissionQueue::iterator queueItem)
+  {
+    if (error)
+      {
+        if (error == boost::system::errc::operation_canceled) {
+          // async receive has been explicitly cancelled (e.g., socket close)
+          return;
+        }
+
+        m_transport.close();
+        throw Transport::Error(error, "error while sending data to socket");
       }
+
+    m_transmissionQueue.erase(queueItem);
+
+    if (!m_transmissionQueue.empty()) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
   }
 
   inline bool
@@ -246,19 +266,6 @@
                            bind(&impl::handle_async_receive, this, _1, _2));
   }
 
-  void
-  handle_async_send(const boost::system::error_code& error, const Block& wire)
-  {
-    // pass (needed to keep data block alive during the send)
-  }
-
-  void
-  handle_async_send2(const boost::system::error_code& error,
-                     const Block& header, const Block& payload)
-  {
-    // pass (needed to keep data blocks alive during the send)
-  }
-
 protected:
   base_transport& m_transport;
 
@@ -266,8 +273,7 @@
   uint8_t m_inputBuffer[MAX_LENGTH];
   size_t m_inputBufferSize;
 
-  std::list< Block > m_sendQueue;
-  std::list< std::pair<Block, Block> > m_sendPairQueue;
+  TransmissionQueue m_transmissionQueue;
   bool m_connectionInProgress;
 
   boost::asio::deadline_timer m_connectTimer;
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 6fab8e0..7ffe530 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -53,18 +53,21 @@
 void
 TcpTransport::send(const Block& wire)
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->send(wire);
 }
 
 void
 TcpTransport::send(const Block& header, const Block& payload)
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->send(header, payload);
 }
 
 void
 TcpTransport::close()
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->close();
   m_impl.reset();
 }
@@ -72,12 +75,15 @@
 void
 TcpTransport::pause()
 {
-  m_impl->pause();
+  if (static_cast<bool>(m_impl)) {
+    m_impl->pause();
+  }
 }
 
 void
 TcpTransport::resume()
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->resume();
 }
 
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index cbfada8..03ee8b6 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -117,8 +117,9 @@
 void
 UnixTransport::pause()
 {
-  BOOST_ASSERT(static_cast<bool>(m_impl));
-  m_impl->pause();
+  if (static_cast<bool>(m_impl)) {
+    m_impl->pause();
+  }
 }
 
 void