management: Making LocalControlHeader encoding independent of Interest/Data wire
Boost.Asio support multi-buffer send operation, which is enabled in this
commit for prepending (potentially different) LocalControlHeader's to
Interest/Data wire.
Change-Id: I39b979f89f196d3e47d6466fb71f6d440bce74d4
refs: #1265
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 3c773a8..24ec85c 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -1,271 +1,19 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
/**
- * Copyright (C) 2013 Regents of the University of California.
- * @author: Jeff Thompson <jefft0@remap.ucla.edu>
+ * Copyright (C) 2013-2014 Regents of the University of California.
* See COPYING for copyright and distribution information.
*/
#include "common.hpp"
#include "tcp-transport.hpp"
-
-#if NDN_CPP_HAVE_CXX11
-// In the std library, the placeholders are in a different namespace than boost.
-using namespace ndn::func_lib::placeholders;
-#endif
-
-using namespace std;
-typedef boost::asio::ip::tcp protocol;
+#include "stream-transport.hpp"
namespace ndn {
-const size_t MAX_LENGTH = 9000;
-
-class TcpTransport::Impl
-{
-public:
- Impl(TcpTransport &transport)
- : transport_(transport)
- , socket_(*transport_.ioService_)
- , partialDataSize_(0)
- , connectionInProgress_(false)
- , connectTimer_(*transport_.ioService_)
- {
- }
-
- void
- connectHandler(const boost::system::error_code& error)
- {
- connectionInProgress_ = false;
- connectTimer_.cancel();
-
- if (!error)
- {
- partialDataSize_ = 0;
- socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
- func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
-
- transport_.isConnected_ = true;
-
- for (std::list<Block>::iterator i = sendQueue_.begin(); i != sendQueue_.end(); ++i)
- socket_.async_send(boost::asio::buffer(i->wire(), i->size()),
- func_lib::bind(&Impl::handle_async_send, this, _1, *i));
-
- sendQueue_.clear();
- }
- else
- {
- // may need to throw exception
- transport_.isConnected_ = false;
- transport_.close();
- throw Transport::Error(error, "error while connecting to the forwarder");
- }
- }
-
- void
- connectTimeoutHandler(const boost::system::error_code& error)
- {
- if (error) // e.g., cancelled timer
- return;
-
- connectionInProgress_ = false;
- transport_.isConnected_ = false;
- socket_.close();
- throw Transport::Error(error, "error while connecting to the forwarder");
- }
-
- void
- resolveHandler(const boost::system::error_code& error,
- boost::asio::ip::tcp::resolver::iterator endpoint,
- const ptr_lib::shared_ptr<boost::asio::ip::tcp::resolver>&)
- {
- if (error)
- {
- if (error == boost::system::errc::operation_canceled)
- return;
-
- throw Transport::Error(error, "Error during resolution of host or port [" + transport_.host_ + ":" + transport_.port_ + "]");
- }
-
- boost::asio::ip::tcp::resolver::iterator end;
- if (endpoint == end)
- {
- connectionInProgress_ = false;
- transport_.isConnected_ = false;
- socket_.close();
- throw Transport::Error(error, "Unable to connect because host or port [" + transport_.host_ + ":" + transport_.port_ + "] cannot be resolved");
- }
-
- socket_.async_connect(*endpoint,
- func_lib::bind(&Impl::connectHandler, this, _1));
- }
-
- void
- connect()
- {
- if (!connectionInProgress_) {
- connectionInProgress_ = true;
-
- // Wait at most 4 seconds to connect
- /// @todo Decide whether this number should be configurable
- connectTimer_.expires_from_now(boost::posix_time::seconds(4));
- connectTimer_.async_wait(func_lib::bind(&Impl::connectTimeoutHandler, this, _1));
-
- using boost::asio::ip::tcp;
-
- ptr_lib::shared_ptr<tcp::resolver> resolver =
- ptr_lib::make_shared<tcp::resolver>(boost::ref(*transport_.ioService_));
-
- tcp::resolver::query query(transport_.host_, transport_.port_);
-
- resolver->async_resolve(query, func_lib::bind(&Impl::resolveHandler, this, _1, _2, resolver));
- }
- }
-
- void
- close()
- {
- connectTimer_.cancel();
- socket_.close();
- transport_.isConnected_ = false;
- }
-
- void
- send(const Block &wire)
- {
- if (!transport_.isConnected_)
- sendQueue_.push_back(wire);
- else
- socket_.async_send(boost::asio::buffer(wire.wire(), wire.size()),
- func_lib::bind(&Impl::handle_async_send, this, _1, wire));
- }
-
- inline void
- processAll(uint8_t *buffer, size_t &offset, size_t availableSize)
- {
- while(offset < availableSize)
- {
- Block element(buffer + offset, availableSize - offset);
- transport_.receive(element);
-
- offset += element.size();
- }
- }
-
- void
- handle_async_receive(const boost::system::error_code& error, std::size_t bytes_recvd)
- {
- /// @todo The socket is not datagram, so need to have internal buffer to handle partial data reception
-
- if (error)
- {
- if (error == boost::system::errc::operation_canceled) {
- // async receive has been explicitly cancelled (e.g., socket close)
- return;
- }
-
- socket_.close(); // closing at this point may not be that necessary
- transport_.isConnected_ = true;
- throw Transport::Error(error, "error while receiving data from socket");
- }
-
- if (!error && bytes_recvd > 0)
- {
- // inputBuffer_ has bytes_recvd received bytes of data
- if (partialDataSize_ > 0)
- {
- size_t newDataSize = std::min(bytes_recvd, MAX_LENGTH-partialDataSize_);
- std::copy(inputBuffer_, inputBuffer_ + newDataSize, partialData_ + partialDataSize_);
-
- partialDataSize_ += newDataSize;
-
- size_t offset = 0;
- try
- {
- processAll(partialData_, offset, partialDataSize_);
-
- // no exceptions => processed the whole thing
- if (bytes_recvd - newDataSize > 0)
- {
- // there is a little bit more data available
-
- offset = 0;
- partialDataSize_ = bytes_recvd - newDataSize;
- std::copy(inputBuffer_ + newDataSize, inputBuffer_ + newDataSize + partialDataSize_, partialData_);
-
- processAll(partialData_, offset, partialDataSize_);
-
- // no exceptions => processed the whole thing
- partialDataSize_ = 0;
- }
- else
- {
- // done processing
- partialDataSize_ = 0;
- }
- }
- catch(Tlv::Error &)
- {
- if (offset > 0)
- {
- partialDataSize_ -= offset;
- std::copy(partialData_ + offset, partialData_ + offset + partialDataSize_, partialData_);
- }
- else if (offset == 0 && partialDataSize_ == MAX_LENGTH)
- {
- // very bad... should close connection
- socket_.close();
- transport_.isConnected_ = true;
- throw Transport::Error(boost::system::error_code(), "input buffer full, but a valid TLV cannot be decoded");
- }
- }
- }
- else
- {
- size_t offset = 0;
- try
- {
- processAll(inputBuffer_, offset, bytes_recvd);
- }
- catch(Tlv::Error &error)
- {
- if (offset > 0)
- {
- partialDataSize_ = bytes_recvd - offset;
- std::copy(inputBuffer_ + offset, inputBuffer_ + offset + partialDataSize_, partialData_);
- }
- }
- }
- }
-
- socket_.async_receive(boost::asio::buffer(inputBuffer_, MAX_LENGTH), 0,
- func_lib::bind(&Impl::handle_async_receive, this, _1, _2));
- }
-
- void
- handle_async_send(const boost::system::error_code& error, const Block &wire)
- {
- // pass (needed to keep data block alive during the send)
- }
-
-private:
- TcpTransport &transport_;
-
- protocol::socket socket_;
- uint8_t inputBuffer_[MAX_LENGTH];
-
- uint8_t partialData_[MAX_LENGTH];
- size_t partialDataSize_;
-
- std::list< Block > sendQueue_;
- bool connectionInProgress_;
-
- boost::asio::deadline_timer connectTimer_;
-};
-
TcpTransport::TcpTransport(const std::string& host, const std::string& port/* = "6363"*/)
- : host_(host)
- , port_(port)
+ : m_host(host)
+ , m_port(port)
{
}
@@ -274,27 +22,36 @@
}
void
-TcpTransport::connect(boost::asio::io_service &ioService,
- const ReceiveCallback &receiveCallback)
+TcpTransport::connect(boost::asio::io_service& ioService,
+ const ReceiveCallback& receiveCallback)
{
- if (!static_cast<bool>(impl_)) {
+ if (!static_cast<bool>(m_impl)) {
Transport::connect(ioService, receiveCallback);
- impl_ = ptr_lib::make_shared<TcpTransport::Impl> (ptr_lib::ref(*this));
+ m_impl = make_shared<Impl> (boost::ref(*this),
+ boost::ref(ioService));
}
- impl_->connect();
+
+ boost::asio::ip::tcp::resolver::query query(m_host, m_port);
+ m_impl->connect(query);
}
void
-TcpTransport::send(const Block &wire)
+TcpTransport::send(const Block& wire)
{
- impl_->send(wire);
+ m_impl->send(wire);
+}
+
+void
+TcpTransport::send(const Block& header, const Block& payload)
+{
+ m_impl->send(header, payload);
}
void
TcpTransport::close()
{
- impl_->close();
+ m_impl->close();
}
-}
+} // namespace ndn