transport: Serialize socket write operations

Change-Id: Ieccaf0ccbd6ee8bd08b1eee77d781edd47b2cbb2
Refs: #1769, #1775
diff --git a/src/common.hpp b/src/common.hpp
index 4a9914c..a35addb 100644
--- a/src/common.hpp
+++ b/src/common.hpp
@@ -60,6 +60,7 @@
 
 using std::shared_ptr;
 using std::weak_ptr;
+using std::bad_weak_ptr;
 using std::make_shared;
 using std::enable_shared_from_this;
 
@@ -102,6 +103,7 @@
 
 using boost::shared_ptr;
 using boost::weak_ptr;
+using boost::bad_weak_ptr;
 using boost::make_shared;
 using boost::enable_shared_from_this;
 
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index 7c259aa..187d7fa 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -137,6 +137,27 @@
     m_pendingInterestTable.remove_if(MatchPendingInterestId(pendingInterestId));
   }
 
+  void
+  asyncPutData(const shared_ptr<const Data>& data)
+  {
+    if (!m_face.m_transport->isConnected())
+      m_face.m_transport->connect(*m_face.m_ioService,
+                                  bind(&Face::onReceiveElement, &m_face, _1));
+
+    if (!m_face.m_transport->isExpectingData())
+      m_face.m_transport->resume();
+
+    if (!data->getLocalControlHeader().empty(false, true))
+      {
+        m_face.m_transport->send(data->getLocalControlHeader().wireEncode(*data, false, true),
+                                 data->wireEncode());
+      }
+    else
+      {
+        m_face.m_transport->send(data->wireEncode());
+      }
+  }
+
   /////////////////////////////////////////////////////////////////////////////////////////////////
   /////////////////////////////////////////////////////////////////////////////////////////////////
 
diff --git a/src/encoding/block.cpp b/src/encoding/block.cpp
index ed265b0..eeb0c3f 100644
--- a/src/encoding/block.cpp
+++ b/src/encoding/block.cpp
@@ -31,6 +31,7 @@
 #include "buffer-stream.hpp"
 
 #include <boost/lexical_cast.hpp>
+#include <boost/asio/buffer.hpp>
 
 namespace ndn {
 
@@ -397,4 +398,10 @@
               boost::lexical_cast<std::string>(type) + "] from Block");
 }
 
+Block::operator boost::asio::const_buffer() const
+{
+  return boost::asio::const_buffer(wire(), size());
+}
+
+
 } // namespace ndn
diff --git a/src/encoding/block.hpp b/src/encoding/block.hpp
index c91d12c..17192d8 100644
--- a/src/encoding/block.hpp
+++ b/src/encoding/block.hpp
@@ -29,6 +29,12 @@
 #include "buffer.hpp"
 #include "tlv.hpp"
 
+namespace boost {
+namespace asio {
+class const_buffer;
+} // namespace asio
+} // namespace boost
+
 namespace ndn {
 
 template<bool> class EncodingImpl;
@@ -281,6 +287,9 @@
   bool
   operator!=(const Block& other) const;
 
+public: // ConvertibleToConstBuffer
+  operator boost::asio::const_buffer() const;
+
 protected:
   ConstBufferPtr m_buffer;
 
diff --git a/src/face.cpp b/src/face.cpp
index 6f13570..28482ba 100644
--- a/src/face.cpp
+++ b/src/face.cpp
@@ -165,19 +165,18 @@
 void
 Face::put(const Data& data)
 {
-  if (!m_transport->isConnected())
-    m_transport->connect(*m_ioService,
-                         bind(&Face::onReceiveElement, this, _1));
+  shared_ptr<const Data> dataPtr;
+  try {
+    dataPtr = data.shared_from_this();
+  }
+  catch (const bad_weak_ptr& e) {
+    std::cerr << "Face::put WARNING: the supplied Data should be created using make_shared<Data>()"
+              << std::endl;
+    dataPtr = make_shared<Data>(data);
+  }
 
-  if (!data.getLocalControlHeader().empty(false, true))
-    {
-      m_transport->send(data.getLocalControlHeader().wireEncode(data, false, true),
-                        data.wireEncode());
-    }
-  else
-    {
-      m_transport->send(data.wireEncode());
-    }
+  // If the same ioService thread, dispatch directly calls the method
+  m_ioService->dispatch(bind(&Impl::asyncPutData, m_impl, dataPtr));
 }
 
 void
@@ -186,8 +185,6 @@
   m_ioService->post(bind(&Impl::asyncRemovePendingInterest, m_impl, pendingInterestId));
 }
 
-
-
 const RegisteredPrefixId*
 Face::setInterestFilter(const InterestFilter& interestFilter,
                         const OnInterest& onInterest,
diff --git a/src/face.hpp b/src/face.hpp
index a5ece29..b915e83 100644
--- a/src/face.hpp
+++ b/src/face.hpp
@@ -440,7 +440,12 @@
    * @brief Publish data packet
    *
    * This method can be called to satisfy the incoming Interest or to put Data packet into
-   * the cache of the local NDN forwarder
+   * the cache of the local NDN forwarder.
+   *
+   * @param data Data packet to publish.  It is highly recommended to use Data packet that
+   *             was created using make_shared<Data>(...).  Otherwise, put() will make an
+   *             extra copy of the Data packet to ensure validity of published Data until
+   *             asynchronous put() operation finishes.
    */
   void
   put(const Data& data);
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
index 04aeac3..491f77a 100644
--- a/src/transport/stream-transport.hpp
+++ b/src/transport/stream-transport.hpp
@@ -38,6 +38,9 @@
   typedef Protocol      protocol;
   typedef StreamTransportImpl<BaseTransport,Protocol> impl;
 
+  typedef std::list<Block> BlockSequence;
+  typedef std::list<BlockSequence> TransmissionQueue;
+
   StreamTransportImpl(base_transport& transport, boost::asio::io_service& ioService)
     : m_transport(transport)
     , m_socket(ioService)
@@ -58,23 +61,11 @@
         resume();
         m_transport.m_isConnected = true;
 
-        for (std::list<Block>::iterator i = m_sendQueue.begin(); i != m_sendQueue.end(); ++i)
-          m_socket.async_send(boost::asio::buffer(i->wire(), i->size()),
-                              bind(&impl::handle_async_send, this, _1, *i));
-
-        for (std::list< std::pair<Block,Block> >::iterator i = m_sendPairQueue.begin();
-             i != m_sendPairQueue.end(); ++i)
-          {
-            std::vector<boost::asio::const_buffer> buffer;
-            buffer.reserve(2);
-            buffer.push_back(boost::asio::buffer(i->first.wire(),  i->first.size()));
-            buffer.push_back(boost::asio::buffer(i->second.wire(), i->second.size()));
-            m_socket.async_send(buffer,
-                                bind(&impl::handle_async_send2, this, _1, i->first, i->second));
-          }
-
-        m_sendQueue.clear();
-        m_sendPairQueue.clear();
+        if (!m_transmissionQueue.empty()) {
+          boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                                   bind(&impl::handleAsyncWrite, this, _1,
+                                        m_transmissionQueue.begin()));
+        }
       }
     else
       {
@@ -124,8 +115,7 @@
 
     m_transport.m_isConnected = false;
     m_transport.m_isExpectingData = false;
-    m_sendQueue.clear();
-    m_sendPairQueue.clear();
+    m_transmissionQueue.clear();
   }
 
   void
@@ -159,30 +149,60 @@
   void
   send(const Block& wire)
   {
-    if (!m_transport.m_isConnected)
-      m_sendQueue.push_back(wire);
-    else
-      m_socket.async_send(boost::asio::buffer(wire.wire(), wire.size()),
-                         bind(&impl::handle_async_send, this, _1, wire));
+    BlockSequence sequence;
+    sequence.push_back(wire);
+    m_transmissionQueue.push_back(sequence);
+
+    if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+
+    // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+    // next write will be scheduled either in connectHandler or in asyncWriteHandler
   }
 
   void
   send(const Block& header, const Block& payload)
   {
-    if (!m_transport.m_isConnected)
-      {
-        m_sendPairQueue.push_back(std::make_pair(header, payload));
-      }
-    else
-      {
-        std::vector<boost::asio::const_buffer> buffers;
-        buffers.reserve(2);
-        buffers.push_back(boost::asio::buffer(header.wire(),  header.size()));
-        buffers.push_back(boost::asio::buffer(payload.wire(), payload.size()));
+    BlockSequence sequence;
+    sequence.push_back(header);
+    sequence.push_back(payload);
+    m_transmissionQueue.push_back(sequence);
 
-        m_socket.async_send(buffers,
-                            bind(&impl::handle_async_send2, this, _1, header, payload));
+    if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+
+    // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+    // next write will be scheduled either in connectHandler or in asyncWriteHandler
+  }
+
+  void
+  handleAsyncWrite(const boost::system::error_code& error,
+                   TransmissionQueue::iterator queueItem)
+  {
+    if (error)
+      {
+        if (error == boost::system::errc::operation_canceled) {
+          // async receive has been explicitly cancelled (e.g., socket close)
+          return;
+        }
+
+        m_transport.close();
+        throw Transport::Error(error, "error while sending data to socket");
       }
+
+    m_transmissionQueue.erase(queueItem);
+
+    if (!m_transmissionQueue.empty()) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
   }
 
   inline bool
@@ -246,19 +266,6 @@
                            bind(&impl::handle_async_receive, this, _1, _2));
   }
 
-  void
-  handle_async_send(const boost::system::error_code& error, const Block& wire)
-  {
-    // pass (needed to keep data block alive during the send)
-  }
-
-  void
-  handle_async_send2(const boost::system::error_code& error,
-                     const Block& header, const Block& payload)
-  {
-    // pass (needed to keep data blocks alive during the send)
-  }
-
 protected:
   base_transport& m_transport;
 
@@ -266,8 +273,7 @@
   uint8_t m_inputBuffer[MAX_LENGTH];
   size_t m_inputBufferSize;
 
-  std::list< Block > m_sendQueue;
-  std::list< std::pair<Block, Block> > m_sendPairQueue;
+  TransmissionQueue m_transmissionQueue;
   bool m_connectionInProgress;
 
   boost::asio::deadline_timer m_connectTimer;
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 6fab8e0..7ffe530 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -53,18 +53,21 @@
 void
 TcpTransport::send(const Block& wire)
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->send(wire);
 }
 
 void
 TcpTransport::send(const Block& header, const Block& payload)
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->send(header, payload);
 }
 
 void
 TcpTransport::close()
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->close();
   m_impl.reset();
 }
@@ -72,12 +75,15 @@
 void
 TcpTransport::pause()
 {
-  m_impl->pause();
+  if (static_cast<bool>(m_impl)) {
+    m_impl->pause();
+  }
 }
 
 void
 TcpTransport::resume()
 {
+  BOOST_ASSERT(static_cast<bool>(m_impl));
   m_impl->resume();
 }
 
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index cbfada8..03ee8b6 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -117,8 +117,9 @@
 void
 UnixTransport::pause()
 {
-  BOOST_ASSERT(static_cast<bool>(m_impl));
-  m_impl->pause();
+  if (static_cast<bool>(m_impl)) {
+    m_impl->pause();
+  }
 }
 
 void