transport: minor refactoring

Duplicate code in StreamTransportImpl<BaseTransport, Protocol>::send
overloads is split to another internal function.

This commit also corrects code style in transport directory.

refs #3136

Change-Id: I47958c4117b6d2c7dde356430e405da2505f942a
diff --git a/src/transport/stream-transport-impl.hpp b/src/transport/stream-transport-impl.hpp
new file mode 100644
index 0000000..963293e
--- /dev/null
+++ b/src/transport/stream-transport-impl.hpp
@@ -0,0 +1,274 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2013-2016 Regents of the University of California.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file.  If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ */
+
+#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
+#define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
+
+#include "transport.hpp"
+
+#include <boost/asio.hpp>
+#include <list>
+
+namespace ndn {
+
+/** \brief implementation detail of a Boost.Asio-based stream-oriented transport
+ *  \tparam BaseTransport a subclass of Transport
+ *  \tparam Protocol a Boost.Asio stream-oriented protocol, including boost::asio::ip::tcp
+ *                   and boost::asio::local::stream_protocol
+ */
+template<typename BaseTransport, typename Protocol>
+class StreamTransportImpl
+{
+public:
+  typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
+  typedef std::list<Block> BlockSequence;
+  typedef std::list<BlockSequence> TransmissionQueue;
+
+  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
+    : m_transport(transport)
+    , m_socket(ioService)
+    , m_inputBufferSize(0)
+    , m_isConnecting(false)
+    , m_connectTimer(ioService)
+  {
+  }
+
+  void
+  connect(const typename Protocol::endpoint& endpoint)
+  {
+    if (!m_isConnecting) {
+      m_isConnecting = true;
+
+      // Wait at most 4 seconds to connect
+      /// @todo Decide whether this number should be configurable
+      m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
+      m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
+
+      m_socket.open();
+      m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this, _1));
+    }
+  }
+
+  void
+  close()
+  {
+    m_isConnecting = false;
+
+    boost::system::error_code error; // to silently ignore all errors
+    m_connectTimer.cancel(error);
+    m_socket.cancel(error);
+    m_socket.close(error);
+
+    m_transport.m_isConnected = false;
+    m_transport.m_isReceiving = false;
+    m_transmissionQueue.clear();
+  }
+
+  void
+  pause()
+  {
+    if (m_isConnecting)
+      return;
+
+    if (m_transport.m_isReceiving) {
+      m_transport.m_isReceiving = false;
+      m_socket.cancel();
+    }
+  }
+
+  void
+  resume()
+  {
+    if (m_isConnecting)
+      return;
+
+    if (!m_transport.m_isReceiving) {
+      m_transport.m_isReceiving = true;
+      m_inputBufferSize = 0;
+      m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+                             bind(&Impl::handleAsyncReceive, this, _1, _2));
+    }
+  }
+
+  void
+  send(const Block& wire)
+  {
+    BlockSequence sequence;
+    sequence.push_back(wire);
+    send(std::move(sequence));
+  }
+
+  void
+  send(const Block& header, const Block& payload)
+  {
+    BlockSequence sequence;
+    sequence.push_back(header);
+    sequence.push_back(payload);
+    send(std::move(sequence));
+  }
+
+protected:
+  void
+  connectHandler(const boost::system::error_code& error)
+  {
+    m_isConnecting = false;
+    m_connectTimer.cancel();
+
+    if (!error) {
+      resume();
+      m_transport.m_isConnected = true;
+
+      if (!m_transmissionQueue.empty()) {
+        boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                                 bind(&Impl::handleAsyncWrite, this, _1,
+                                      m_transmissionQueue.begin()));
+      }
+    }
+    else {
+      m_transport.m_isConnected = false;
+      m_transport.close();
+      BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
+    }
+  }
+
+  void
+  connectTimeoutHandler(const boost::system::error_code& error)
+  {
+    if (error) // e.g., cancelled timer
+      return;
+
+    m_transport.close();
+    BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
+  }
+
+  void
+  send(BlockSequence&& sequence)
+  {
+    m_transmissionQueue.emplace_back(sequence);
+
+    if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&Impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+
+    // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+    // next write will be scheduled either in connectHandler or in asyncWriteHandler
+  }
+
+  void
+  handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
+  {
+    if (error) {
+      if (error == boost::system::errc::operation_canceled) {
+        // async receive has been explicitly cancelled (e.g., socket close)
+        return;
+      }
+
+      m_transport.close();
+      BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
+    }
+
+    if (!m_transport.m_isConnected) {
+      return; // queue has been already cleared
+    }
+
+    m_transmissionQueue.erase(queueItem);
+
+    if (!m_transmissionQueue.empty()) {
+      boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+                               bind(&Impl::handleAsyncWrite, this, _1,
+                                    m_transmissionQueue.begin()));
+    }
+  }
+
+  void
+  handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
+  {
+    if (error) {
+      if (error == boost::system::errc::operation_canceled) {
+        // async receive has been explicitly cancelled (e.g., socket close)
+        return;
+      }
+
+      m_transport.close();
+      BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
+    }
+
+    m_inputBufferSize += nBytesRecvd;
+    // do magic
+
+    std::size_t offset = 0;
+    bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
+    if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
+      m_transport.close();
+      BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
+                                             "input buffer full, but a valid TLV cannot be "
+                                             "decoded"));
+    }
+
+    if (offset > 0) {
+      if (offset != m_inputBufferSize) {
+        std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
+        m_inputBufferSize -= offset;
+      }
+      else {
+        m_inputBufferSize = 0;
+      }
+    }
+
+    m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+                                               MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+                           bind(&Impl::handleAsyncReceive, this, _1, _2));
+  }
+
+  bool
+  processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
+  {
+    while (offset < nBytesAvailable) {
+      bool isOk = false;
+      Block element;
+      std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
+      if (!isOk)
+        return false;
+
+      m_transport.receive(element);
+      offset += element.size();
+    }
+    return true;
+  }
+
+protected:
+  BaseTransport& m_transport;
+
+  typename Protocol::socket m_socket;
+  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+  size_t m_inputBufferSize;
+
+  TransmissionQueue m_transmissionQueue;
+  bool m_isConnecting;
+
+  boost::asio::deadline_timer m_connectTimer;
+};
+
+} // namespace ndn
+
+#endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP