face: Serializing StreamFace::send(Interest|Data) operations using queue

Change-Id: I254d5e3030c94fac224e81e66bcbc58ffa752c0e
Refs: #1777
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index f9b7cd9..ef28c25 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -66,15 +66,15 @@
   processErrorCode(const boost::system::error_code& error);
 
   void
-  handleSend(const boost::system::error_code& error,
-             const Block& header, const Block& payload);
+  sendFromQueue();
+
   void
   handleSend(const boost::system::error_code& error,
-             const Block& wire);
+             std::size_t nBytesTransferred);
 
   void
   handleReceive(const boost::system::error_code& error,
-                std::size_t bytes_recvd);
+                std::size_t nBytesReceived);
 
   void
   keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face);
@@ -88,6 +88,7 @@
 private:
   uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
   std::size_t m_inputBufferSize;
+  std::queue<Block> m_sendQueue;
 
   friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
   friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
@@ -143,10 +144,11 @@
   static void
   send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
   {
-    face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
-                                                  packet.wireEncode().size()),
-                              bind(&StreamFace<Protocol, FaceBase>::handleSend,
-                                   &face, _1, packet.wireEncode()));
+    bool wasQueueEmpty = face.m_sendQueue.empty();
+    face.m_sendQueue.push(packet.wireEncode());
+
+    if (wasQueueEmpty)
+      face.sendFromQueue();
   }
 };
 
@@ -157,29 +159,16 @@
   static void
   send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
   {
-    using namespace boost::asio;
+    bool wasQueueEmpty = face.m_sendQueue.empty();
 
-    if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
+    if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
       {
-        const Block& payload = packet.wireEncode();
-        face.m_socket->async_send(buffer(payload.wire(), payload.size()),
-                                  bind(&StreamFace<Protocol, LocalFace>::handleSend,
-                                       &face, _1, packet.wireEncode()));
+        face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
       }
-    else
-      {
-        Block header = face.filterAndEncodeLocalControlHeader(packet);
-        const Block& payload = packet.wireEncode();
+    face.m_sendQueue.push(packet.wireEncode());
 
-        std::vector<const_buffer> buffers;
-        buffers.reserve(2);
-        buffers.push_back(buffer(header.wire(),  header.size()));
-        buffers.push_back(buffer(payload.wire(), payload.size()));
-
-        face.m_socket->async_send(buffers,
-                                  bind(&StreamFace<Protocol, LocalFace>::handleSend,
-                                       &face, _1, header, payload));
-      }
+    if (wasQueueEmpty)
+      face.sendFromQueue();
   }
 };
 
@@ -202,6 +191,15 @@
 
 template<class T, class U>
 inline void
+StreamFace<T, U>::sendFromQueue()
+{
+  const Block& block = this->m_sendQueue.front();
+  boost::asio::async_write(*this->m_socket, boost::asio::buffer(block),
+                           bind(&StreamFace<T, U>::handleSend, this, _1, _2));
+}
+
+template<class T, class U>
+inline void
 StreamFace<T, U>::close()
 {
   if (!m_socket->is_open())
@@ -259,42 +257,35 @@
 template<class T, class U>
 inline void
 StreamFace<T, U>::handleSend(const boost::system::error_code& error,
-                             const Block& wire)
+                             size_t nBytesTransferred)
 {
   if (error)
     return processErrorCode(error);
 
-  NFD_LOG_TRACE("[id:" << this->getId()
-                << ",uri:" << this->getRemoteUri()
-                << "] Successfully sent: " << wire.size() << " bytes");
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::handleSend(const boost::system::error_code& error,
-                             const Block& header, const Block& payload)
-{
-  if (error)
-    return processErrorCode(error);
+  BOOST_ASSERT(!m_sendQueue.empty());
 
   NFD_LOG_TRACE("[id:" << this->getId()
                 << ",uri:" << this->getRemoteUri()
-                << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
+                << "] Successfully sent: " << nBytesTransferred << " bytes");
+
+  m_sendQueue.pop();
+  if (!m_sendQueue.empty())
+    sendFromQueue();
 }
 
 template<class T, class U>
 inline void
 StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
-                             std::size_t bytes_recvd)
+                                std::size_t nBytesReceived)
 {
   if (error)
     return processErrorCode(error);
 
   NFD_LOG_TRACE("[id:" << this->getId()
                 << ",uri:" << this->getRemoteUri()
-                << "] Received: " << bytes_recvd << " bytes");
+                << "] Received: " << nBytesReceived << " bytes");
 
-  m_inputBufferSize += bytes_recvd;
+  m_inputBufferSize += nBytesReceived;
   // do magic
 
   std::size_t offset = 0;
@@ -374,6 +365,10 @@
   // handlers are dispatched
   io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
                this, this->shared_from_this()));
+
+  // clear send queue
+  std::queue<Block> emptyQueue;
+  std::swap(emptyQueue, m_sendQueue);
 }
 
 } // namespace nfd