transport: minor refactoring
Duplicate code in StreamTransportImpl<BaseTransport, Protocol>::send
overloads is split to another internal function.
This commit also corrects code style in transport directory.
refs #3136
Change-Id: I47958c4117b6d2c7dde356430e405da2505f942a
diff --git a/src/detail/face-impl.hpp b/src/detail/face-impl.hpp
index ef49992..476e85e 100644
--- a/src/detail/face-impl.hpp
+++ b/src/detail/face-impl.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2015 Regents of the University of California.
+ * Copyright (c) 2013-2016 Regents of the University of California.
*
* This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
*
@@ -127,7 +127,7 @@
m_face.m_transport->connect(m_face.m_ioService,
bind(&Face::onReceiveElement, &m_face, _1));
- if (wantResume && !m_face.m_transport->isExpectingData())
+ if (wantResume && !m_face.m_transport->isReceiving())
m_face.m_transport->resume();
}
diff --git a/src/transport/stream-transport-impl.hpp b/src/transport/stream-transport-impl.hpp
new file mode 100644
index 0000000..963293e
--- /dev/null
+++ b/src/transport/stream-transport-impl.hpp
@@ -0,0 +1,274 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2013-2016 Regents of the University of California.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ */
+
+#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
+#define NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
+
+#include "transport.hpp"
+
+#include <boost/asio.hpp>
+#include <list>
+
+namespace ndn {
+
+/** \brief implementation detail of a Boost.Asio-based stream-oriented transport
+ * \tparam BaseTransport a subclass of Transport
+ * \tparam Protocol a Boost.Asio stream-oriented protocol, including boost::asio::ip::tcp
+ * and boost::asio::local::stream_protocol
+ */
+template<typename BaseTransport, typename Protocol>
+class StreamTransportImpl
+{
+public:
+ typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
+ typedef std::list<Block> BlockSequence;
+ typedef std::list<BlockSequence> TransmissionQueue;
+
+ StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
+ : m_transport(transport)
+ , m_socket(ioService)
+ , m_inputBufferSize(0)
+ , m_isConnecting(false)
+ , m_connectTimer(ioService)
+ {
+ }
+
+ void
+ connect(const typename Protocol::endpoint& endpoint)
+ {
+ if (!m_isConnecting) {
+ m_isConnecting = true;
+
+ // Wait at most 4 seconds to connect
+ /// @todo Decide whether this number should be configurable
+ m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
+ m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
+
+ m_socket.open();
+ m_socket.async_connect(endpoint, bind(&Impl::connectHandler, this, _1));
+ }
+ }
+
+ void
+ close()
+ {
+ m_isConnecting = false;
+
+ boost::system::error_code error; // to silently ignore all errors
+ m_connectTimer.cancel(error);
+ m_socket.cancel(error);
+ m_socket.close(error);
+
+ m_transport.m_isConnected = false;
+ m_transport.m_isReceiving = false;
+ m_transmissionQueue.clear();
+ }
+
+ void
+ pause()
+ {
+ if (m_isConnecting)
+ return;
+
+ if (m_transport.m_isReceiving) {
+ m_transport.m_isReceiving = false;
+ m_socket.cancel();
+ }
+ }
+
+ void
+ resume()
+ {
+ if (m_isConnecting)
+ return;
+
+ if (!m_transport.m_isReceiving) {
+ m_transport.m_isReceiving = true;
+ m_inputBufferSize = 0;
+ m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
+ bind(&Impl::handleAsyncReceive, this, _1, _2));
+ }
+ }
+
+ void
+ send(const Block& wire)
+ {
+ BlockSequence sequence;
+ sequence.push_back(wire);
+ send(std::move(sequence));
+ }
+
+ void
+ send(const Block& header, const Block& payload)
+ {
+ BlockSequence sequence;
+ sequence.push_back(header);
+ sequence.push_back(payload);
+ send(std::move(sequence));
+ }
+
+protected:
+ void
+ connectHandler(const boost::system::error_code& error)
+ {
+ m_isConnecting = false;
+ m_connectTimer.cancel();
+
+ if (!error) {
+ resume();
+ m_transport.m_isConnected = true;
+
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&Impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+ }
+ else {
+ m_transport.m_isConnected = false;
+ m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
+ }
+ }
+
+ void
+ connectTimeoutHandler(const boost::system::error_code& error)
+ {
+ if (error) // e.g., cancelled timer
+ return;
+
+ m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
+ }
+
+ void
+ send(BlockSequence&& sequence)
+ {
+ m_transmissionQueue.emplace_back(sequence);
+
+ if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&Impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+
+ // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
+ // next write will be scheduled either in connectHandler or in asyncWriteHandler
+ }
+
+ void
+ handleAsyncWrite(const boost::system::error_code& error, TransmissionQueue::iterator queueItem)
+ {
+ if (error) {
+ if (error == boost::system::errc::operation_canceled) {
+ // async receive has been explicitly cancelled (e.g., socket close)
+ return;
+ }
+
+ m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
+ }
+
+ if (!m_transport.m_isConnected) {
+ return; // queue has been already cleared
+ }
+
+ m_transmissionQueue.erase(queueItem);
+
+ if (!m_transmissionQueue.empty()) {
+ boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
+ bind(&Impl::handleAsyncWrite, this, _1,
+ m_transmissionQueue.begin()));
+ }
+ }
+
+ void
+ handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
+ {
+ if (error) {
+ if (error == boost::system::errc::operation_canceled) {
+ // async receive has been explicitly cancelled (e.g., socket close)
+ return;
+ }
+
+ m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
+ }
+
+ m_inputBufferSize += nBytesRecvd;
+ // do magic
+
+ std::size_t offset = 0;
+ bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
+ if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
+ m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
+ "input buffer full, but a valid TLV cannot be "
+ "decoded"));
+ }
+
+ if (offset > 0) {
+ if (offset != m_inputBufferSize) {
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
+ m_inputBufferSize -= offset;
+ }
+ else {
+ m_inputBufferSize = 0;
+ }
+ }
+
+ m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
+ MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
+ bind(&Impl::handleAsyncReceive, this, _1, _2));
+ }
+
+ bool
+ processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
+ {
+ while (offset < nBytesAvailable) {
+ bool isOk = false;
+ Block element;
+ std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
+ if (!isOk)
+ return false;
+
+ m_transport.receive(element);
+ offset += element.size();
+ }
+ return true;
+ }
+
+protected:
+ BaseTransport& m_transport;
+
+ typename Protocol::socket m_socket;
+ uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
+ size_t m_inputBufferSize;
+
+ TransmissionQueue m_transmissionQueue;
+ bool m_isConnecting;
+
+ boost::asio::deadline_timer m_connectTimer;
+};
+
+} // namespace ndn
+
+#endif // NDN_TRANSPORT_STREAM_TRANSPORT_IMPL_HPP
diff --git a/src/transport/stream-transport-with-resolver-impl.hpp b/src/transport/stream-transport-with-resolver-impl.hpp
new file mode 100644
index 0000000..1fa3351
--- /dev/null
+++ b/src/transport/stream-transport-with-resolver-impl.hpp
@@ -0,0 +1,88 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2013-2016 Regents of the University of California.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ */
+
+#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_WITH_RESOLVER_IMPL_HPP
+#define NDN_TRANSPORT_STREAM_TRANSPORT_WITH_RESOLVER_IMPL_HPP
+
+#include "stream-transport-impl.hpp"
+
+namespace ndn {
+
+/** \brief implementation detail of a Boost.Asio-based stream-oriented transport
+ * with resolver support
+ */
+template<typename BaseTransport, typename Protocol>
+class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
+{
+public:
+ typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> Impl;
+
+ StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
+ : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
+ {
+ }
+
+ void
+ connect(const typename Protocol::resolver::query& query)
+ {
+ if (this->m_isConnecting) {
+ return;
+ }
+
+ this->m_isConnecting = true;
+
+ // Wait at most 4 seconds to connect
+ /// @todo Decide whether this number should be configurable
+ this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
+ this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
+
+ // typename boost::asio::ip::basic_resolver< Protocol > resolver;
+ auto resolver = make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
+ resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
+ }
+
+protected:
+ void
+ resolveHandler(const boost::system::error_code& error,
+ typename Protocol::resolver::iterator endpoint,
+ const shared_ptr<typename Protocol::resolver>&)
+ {
+ if (error) {
+ if (error == boost::system::errc::operation_canceled)
+ return;
+
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
+ }
+
+ typename Protocol::resolver::iterator end;
+ if (endpoint == end) {
+ this->m_transport.close();
+ BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
+ }
+
+ this->m_socket.async_connect(*endpoint, bind(&Impl::connectHandler, this, _1));
+ }
+};
+
+
+} // namespace ndn
+
+#endif // NDN_TRANSPORT_STREAM_TRANSPORT_WITH_RESOLVER_IMPL_HPP
diff --git a/src/transport/stream-transport.hpp b/src/transport/stream-transport.hpp
deleted file mode 100644
index a0c7ca3..0000000
--- a/src/transport/stream-transport.hpp
+++ /dev/null
@@ -1,346 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2013-2016 Regents of the University of California.
- *
- * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
- *
- * ndn-cxx library is free software: you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later version.
- *
- * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- *
- * You should have received copies of the GNU General Public License and GNU Lesser
- * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
- * <http://www.gnu.org/licenses/>.
- *
- * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
- */
-
-#ifndef NDN_TRANSPORT_STREAM_TRANSPORT_HPP
-#define NDN_TRANSPORT_STREAM_TRANSPORT_HPP
-
-#include "transport.hpp"
-
-#include <boost/asio.hpp>
-#include <list>
-
-namespace ndn {
-
-template<class BaseTransport, class Protocol>
-class StreamTransportImpl
-{
-public:
- typedef StreamTransportImpl<BaseTransport,Protocol> Impl;
-
- typedef std::list<Block> BlockSequence;
- typedef std::list<BlockSequence> TransmissionQueue;
-
- StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
- : m_transport(transport)
- , m_socket(ioService)
- , m_inputBufferSize(0)
- , m_connectionInProgress(false)
- , m_connectTimer(ioService)
- {
- }
-
- void
- connectHandler(const boost::system::error_code& error)
- {
- m_connectionInProgress = false;
- m_connectTimer.cancel();
-
- if (!error)
- {
- resume();
- m_transport.m_isConnected = true;
-
- if (!m_transmissionQueue.empty()) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
- }
- }
- else
- {
- // may need to throw exception
- m_transport.m_isConnected = false;
- m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
- }
- }
-
- void
- connectTimeoutHandler(const boost::system::error_code& error)
- {
- if (error) // e.g., cancelled timer
- return;
-
- m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(error, "error while connecting to the forwarder"));
- }
-
- void
- connect(const typename Protocol::endpoint& endpoint)
- {
- if (!m_connectionInProgress) {
- m_connectionInProgress = true;
-
- // Wait at most 4 seconds to connect
- /// @todo Decide whether this number should be configurable
- m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
- m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
-
- m_socket.open();
- m_socket.async_connect(endpoint,
- bind(&Impl::connectHandler, this, _1));
- }
- }
-
- void
- close()
- {
- m_connectionInProgress = false;
-
- boost::system::error_code error; // to silently ignore all errors
- m_connectTimer.cancel(error);
- m_socket.cancel(error);
- m_socket.close(error);
-
- m_transport.m_isConnected = false;
- m_transport.m_isExpectingData = false;
- m_transmissionQueue.clear();
- }
-
- void
- pause()
- {
- if (m_connectionInProgress)
- return;
-
- if (m_transport.m_isExpectingData)
- {
- m_transport.m_isExpectingData = false;
- m_socket.cancel();
- }
- }
-
- /**
- * @warning Must not be called directly or indirectly from within handleAsyncReceive invocation
- */
- void
- resume()
- {
- if (m_connectionInProgress)
- return;
-
- if (!m_transport.m_isExpectingData)
- {
- m_transport.m_isExpectingData = true;
- m_inputBufferSize = 0;
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&Impl::handleAsyncReceive, this, _1, _2));
- }
- }
-
- void
- send(const Block& wire)
- {
- BlockSequence sequence;
- sequence.push_back(wire);
- m_transmissionQueue.push_back(sequence);
-
- if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
- }
-
- // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
- // next write will be scheduled either in connectHandler or in asyncWriteHandler
- }
-
- void
- send(const Block& header, const Block& payload)
- {
- BlockSequence sequence;
- sequence.push_back(header);
- sequence.push_back(payload);
- m_transmissionQueue.push_back(sequence);
-
- if (m_transport.m_isConnected && m_transmissionQueue.size() == 1) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
- }
-
- // if not connected or there is transmission in progress (m_transmissionQueue.size() > 1),
- // next write will be scheduled either in connectHandler or in asyncWriteHandler
- }
-
- void
- handleAsyncWrite(const boost::system::error_code& error,
- TransmissionQueue::iterator queueItem)
- {
- if (error)
- {
- if (error == boost::system::errc::operation_canceled) {
- // async receive has been explicitly cancelled (e.g., socket close)
- return;
- }
-
- m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(error, "error while sending data to socket"));
- }
-
- if (!m_transport.m_isConnected) {
- return; // queue has been already cleared
- }
-
- m_transmissionQueue.erase(queueItem);
-
- if (!m_transmissionQueue.empty()) {
- boost::asio::async_write(m_socket, *m_transmissionQueue.begin(),
- bind(&Impl::handleAsyncWrite, this, _1,
- m_transmissionQueue.begin()));
- }
- }
-
- bool
- processAll(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
- {
- while (offset < nBytesAvailable) {
- bool isOk = false;
- Block element;
- std::tie(isOk, element) = Block::fromBuffer(buffer + offset, nBytesAvailable - offset);
- if (!isOk)
- return false;
-
- m_transport.receive(element);
- offset += element.size();
- }
- return true;
- }
-
- void
- handleAsyncReceive(const boost::system::error_code& error, std::size_t nBytesRecvd)
- {
- if (error)
- {
- if (error == boost::system::errc::operation_canceled) {
- // async receive has been explicitly cancelled (e.g., socket close)
- return;
- }
-
- m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(error, "error while receiving data from socket"));
- }
-
- m_inputBufferSize += nBytesRecvd;
- // do magic
-
- std::size_t offset = 0;
- bool hasProcessedSome = processAll(m_inputBuffer, offset, m_inputBufferSize);
- if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
- {
- m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(boost::system::error_code(),
- "input buffer full, but a valid TLV cannot be "
- "decoded"));
- }
-
- if (offset > 0)
- {
- if (offset != m_inputBufferSize)
- {
- std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
- m_inputBuffer);
- m_inputBufferSize -= offset;
- }
- else
- {
- m_inputBufferSize = 0;
- }
- }
-
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
- MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
- bind(&Impl::handleAsyncReceive, this, _1, _2));
- }
-
-protected:
- BaseTransport& m_transport;
-
- typename Protocol::socket m_socket;
- uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
- size_t m_inputBufferSize;
-
- TransmissionQueue m_transmissionQueue;
- bool m_connectionInProgress;
-
- boost::asio::deadline_timer m_connectTimer;
-};
-
-
-template<class BaseTransport, class Protocol>
-class StreamTransportWithResolverImpl : public StreamTransportImpl<BaseTransport, Protocol>
-{
-public:
- typedef StreamTransportWithResolverImpl<BaseTransport,Protocol> Impl;
-
- StreamTransportWithResolverImpl(BaseTransport& transport, boost::asio::io_service& ioService)
- : StreamTransportImpl<BaseTransport, Protocol>(transport, ioService)
- {
- }
-
- void
- resolveHandler(const boost::system::error_code& error,
- typename Protocol::resolver::iterator endpoint,
- const shared_ptr<typename Protocol::resolver>&)
- {
- if (error)
- {
- if (error == boost::system::errc::operation_canceled)
- return;
-
- BOOST_THROW_EXCEPTION(Transport::Error(error, "Error during resolution of host or port"));
- }
-
- typename Protocol::resolver::iterator end;
- if (endpoint == end)
- {
- this->m_transport.close();
- BOOST_THROW_EXCEPTION(Transport::Error(error, "Unable to resolve because host or port"));
- }
-
- this->m_socket.async_connect(*endpoint,
- bind(&Impl::connectHandler, this, _1));
- }
-
- void
- connect(const typename Protocol::resolver::query& query)
- {
- if (!this->m_connectionInProgress) {
- this->m_connectionInProgress = true;
-
- // Wait at most 4 seconds to connect
- /// @todo Decide whether this number should be configurable
- this->m_connectTimer.expires_from_now(boost::posix_time::seconds(4));
- this->m_connectTimer.async_wait(bind(&Impl::connectTimeoutHandler, this, _1));
-
- // typename boost::asio::ip::basic_resolver< Protocol > resolver;
- shared_ptr<typename Protocol::resolver> resolver =
- make_shared<typename Protocol::resolver>(ref(this->m_socket.get_io_service()));
-
- resolver->async_resolve(query, bind(&Impl::resolveHandler, this, _1, _2, resolver));
- }
- }
-};
-
-
-} // namespace ndn
-
-#endif // NDN_TRANSPORT_STREAM_TRANSPORT_HPP
diff --git a/src/transport/tcp-transport.cpp b/src/transport/tcp-transport.cpp
index 20952ab..343ddd2 100644
--- a/src/transport/tcp-transport.cpp
+++ b/src/transport/tcp-transport.cpp
@@ -19,10 +19,8 @@
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
*/
-#include "common.hpp"
-
#include "tcp-transport.hpp"
-#include "stream-transport.hpp"
+#include "stream-transport-with-resolver-impl.hpp"
#include "util/face-uri.hpp"
namespace ndn {
@@ -33,9 +31,7 @@
{
}
-TcpTransport::~TcpTransport()
-{
-}
+TcpTransport::~TcpTransport() = default;
shared_ptr<TcpTransport>
TcpTransport::create(const std::string& uri)
@@ -81,7 +77,7 @@
TcpTransport::connect(boost::asio::io_service& ioService,
const ReceiveCallback& receiveCallback)
{
- if (!static_cast<bool>(m_impl)) {
+ if (m_impl == nullptr) {
Transport::connect(ioService, receiveCallback);
m_impl = make_shared<Impl>(ref(*this), ref(ioService));
@@ -94,21 +90,21 @@
void
TcpTransport::send(const Block& wire)
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->send(wire);
}
void
TcpTransport::send(const Block& header, const Block& payload)
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->send(header, payload);
}
void
TcpTransport::close()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->close();
m_impl.reset();
}
@@ -116,7 +112,7 @@
void
TcpTransport::pause()
{
- if (static_cast<bool>(m_impl)) {
+ if (m_impl != nullptr) {
m_impl->pause();
}
}
@@ -124,7 +120,7 @@
void
TcpTransport::resume()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->resume();
}
diff --git a/src/transport/tcp-transport.hpp b/src/transport/tcp-transport.hpp
index b0b5124..eb7878b 100644
--- a/src/transport/tcp-transport.hpp
+++ b/src/transport/tcp-transport.hpp
@@ -22,56 +22,61 @@
#ifndef NDN_TRANSPORT_TCP_TRANSPORT_HPP
#define NDN_TRANSPORT_TCP_TRANSPORT_HPP
-#include "../common.hpp"
#include "transport.hpp"
#include "../util/config-file.hpp"
-
-// forward declaration
-namespace boost { namespace asio { namespace ip { class tcp; } } }
+namespace boost {
+namespace asio {
+namespace ip {
+class tcp;
+} // namespace ip
+} // namespace asio
+} // namespace boost
namespace ndn {
-// forward declaration
-template<class T, class U> class StreamTransportImpl;
-template<class T, class U> class StreamTransportWithResolverImpl;
+template<typename BaseTransport, typename Protocol>
+class StreamTransportImpl;
+template<typename BaseTransport, typename Protocol>
+class StreamTransportWithResolverImpl;
+
+/** \brief a transport using TCP socket
+ */
class TcpTransport : public Transport
{
public:
+ explicit
TcpTransport(const std::string& host, const std::string& port = "6363");
+
~TcpTransport();
- // from Transport
virtual void
connect(boost::asio::io_service& ioService,
- const ReceiveCallback& receiveCallback);
+ const ReceiveCallback& receiveCallback) override;
virtual void
- close();
+ close() override;
virtual void
- pause();
+ pause() override;
virtual void
- resume();
+ resume() override;
virtual void
- send(const Block& wire);
+ send(const Block& wire) override;
virtual void
- send(const Block& header, const Block& payload);
+ send(const Block& header, const Block& payload) override;
- /**
- * @brief Create transport with parameters defined in URI
- *
- * @throws Transport::Error if incorrect URI or unsupported protocol is specified
+ /** \brief Create transport with parameters defined in URI
+ * \throw Transport::Error incorrect URI or unsupported protocol is specified
*/
static shared_ptr<TcpTransport>
create(const std::string& uri);
NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
-
static std::pair<std::string, std::string>
getSocketHostAndPortFromUri(const std::string& uri);
@@ -82,7 +87,7 @@
typedef StreamTransportWithResolverImpl<TcpTransport, boost::asio::ip::tcp> Impl;
friend class StreamTransportImpl<TcpTransport, boost::asio::ip::tcp>;
friend class StreamTransportWithResolverImpl<TcpTransport, boost::asio::ip::tcp>;
- shared_ptr< Impl > m_impl;
+ shared_ptr<Impl> m_impl;
};
} // namespace ndn
diff --git a/src/transport/transport.cpp b/src/transport/transport.cpp
new file mode 100644
index 0000000..0ea5bee
--- /dev/null
+++ b/src/transport/transport.cpp
@@ -0,0 +1,53 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2013-2016 Regents of the University of California.
+ *
+ * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
+ *
+ * ndn-cxx library is free software: you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free Software
+ * Foundation, either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
+ * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
+ * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
+ *
+ * You should have received copies of the GNU General Public License and GNU Lesser
+ * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
+ * <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ */
+
+#include "transport.hpp"
+
+namespace ndn {
+
+Transport::Error::Error(const boost::system::error_code& code, const std::string& msg)
+ : std::runtime_error(msg + (code.value() ? " (" + code.category().message(code.value()) + ")" : ""))
+{
+}
+
+Transport::Error::Error(const std::string& msg)
+ : std::runtime_error(msg)
+{
+}
+
+Transport::Transport()
+ : m_ioService(nullptr)
+ , m_isConnected(false)
+ , m_isReceiving(false)
+{
+}
+
+void
+Transport::connect(boost::asio::io_service& ioService,
+ const ReceiveCallback& receiveCallback)
+{
+ BOOST_ASSERT(receiveCallback != nullptr);
+
+ m_ioService = &ioService;
+ m_receiveCallback = receiveCallback;
+}
+
+} // namespace ndn
diff --git a/src/transport/transport.hpp b/src/transport/transport.hpp
index 1095f14..f656802 100644
--- a/src/transport/transport.hpp
+++ b/src/transport/transport.hpp
@@ -27,7 +27,6 @@
#include <boost/system/error_code.hpp>
-// forward declaration
namespace boost {
namespace asio {
class io_service;
@@ -36,123 +35,105 @@
namespace ndn {
+/** \brief provides TLV-block delivery service
+ */
class Transport : noncopyable
{
public:
class Error : public std::runtime_error
{
public:
- inline Error(const boost::system::error_code& code, const std::string& msg);
- inline Error(const std::string& msg);
+ Error(const boost::system::error_code& code, const std::string& msg);
+
+ explicit
+ Error(const std::string& msg);
};
- typedef function<void (const Block& wire)> ReceiveCallback;
- typedef function<void ()> ErrorCallback;
+ typedef function<void(const Block& wire)> ReceiveCallback;
+ typedef function<void()> ErrorCallback;
- inline
Transport();
- inline virtual
- ~Transport();
+ virtual
+ ~Transport() = default;
- /**
- * @brief Connect transport
- *
- * @throws boost::system::system_error if connection cannot be established
+ /** \brief asynchronously open the connection
+ * \param ioService io_service to create socket on
+ * \param receiveCallback callback function when a TLV block is received; must not be empty
+ * \throw boost::system::system_error connection cannot be established
*/
- inline virtual void
- connect(boost::asio::io_service& io_service,
- const ReceiveCallback& receiveCallback);
+ virtual void
+ connect(boost::asio::io_service& ioService, const ReceiveCallback& receiveCallback);
- /**
- * @brief Close the connection.
+ /** \brief Close the connection.
*/
virtual void
close() = 0;
- /**
- * @brief Send block of data from @p wire through the transport
- *
- * @param wire A block of data to send
+ /** \brief send a TLV block through the transport
*/
virtual void
send(const Block& wire) = 0;
- /**
- * @brief Alternative version of sending data, applying scatter/gather I/O concept
+ /** \brief send two memory blocks through the transport
*
- * Two non-consecutive memory blocks will be send out together, e.g., as part of the
- * same message in datagram-oriented transports.
+ * Scatter/gather API is utilized to send two non-consecutive memory blocks together
+ * (as part of the same message in datagram-oriented transports).
*/
virtual void
send(const Block& header, const Block& payload) = 0;
+ /** \brief pause the transport
+ * \post receiveCallback will not be invoked
+ * \note This operation has no effect if transport has been paused,
+ * or when connection is being established.
+ */
virtual void
pause() = 0;
+ /** \brief resume the transport
+ * \post receiveCallback will be invoked
+ * \note This operation has no effect if transport is not paused,
+ * or when connection is being established.
+ */
virtual void
resume() = 0;
- inline bool
- isConnected();
+ /** \retval true connection has been established
+ * \retval false connection is not yet established or has been closed
+ */
+ bool
+ isConnected() const;
- inline bool
- isExpectingData();
+ /** \retval true incoming packets are expected, receiveCallback will be invoked
+ * \retval false incoming packets are not expected, receiveCallback will not be invoked
+ */
+ bool
+ isReceiving() const;
protected:
- inline void
+ /** \brief invoke the receive callback
+ */
+ void
receive(const Block& wire);
protected:
boost::asio::io_service* m_ioService;
bool m_isConnected;
- bool m_isExpectingData;
+ bool m_isReceiving;
ReceiveCallback m_receiveCallback;
};
-inline
-Transport::Transport()
- : m_ioService(0)
- , m_isConnected(false)
- , m_isExpectingData(false)
-{
-}
-
-inline
-Transport::Error::Error(const boost::system::error_code& code, const std::string& msg)
- : std::runtime_error(msg + (code.value() ? " (" + code.category().message(code.value()) + ")" : ""))
-{
-}
-
-inline
-Transport::Error::Error(const std::string& msg)
- : std::runtime_error(msg)
-{
-}
-
-inline
-Transport::~Transport()
-{
-}
-
-inline void
-Transport::connect(boost::asio::io_service& ioService,
- const ReceiveCallback& receiveCallback)
-{
- m_ioService = &ioService;
- m_receiveCallback = receiveCallback;
-}
-
inline bool
-Transport::isConnected()
+Transport::isConnected() const
{
return m_isConnected;
}
inline bool
-Transport::isExpectingData()
+Transport::isReceiving() const
{
- return m_isExpectingData;
+ return m_isReceiving;
}
inline void
diff --git a/src/transport/unix-transport.cpp b/src/transport/unix-transport.cpp
index 0c63960..f2e44aa 100644
--- a/src/transport/unix-transport.cpp
+++ b/src/transport/unix-transport.cpp
@@ -19,10 +19,8 @@
* See AUTHORS.md for complete list of ndn-cxx authors and contributors.
*/
-#include "common.hpp"
-
#include "unix-transport.hpp"
-#include "stream-transport.hpp"
+#include "stream-transport-impl.hpp"
#include "../face.hpp"
#include "util/face-uri.hpp"
@@ -77,7 +75,7 @@
UnixTransport::connect(boost::asio::io_service& ioService,
const ReceiveCallback& receiveCallback)
{
- if (!static_cast<bool>(m_impl)) {
+ if (m_impl == nullptr) {
Transport::connect(ioService, receiveCallback);
m_impl = make_shared<Impl>(ref(*this), ref(ioService));
@@ -89,21 +87,21 @@
void
UnixTransport::send(const Block& wire)
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->send(wire);
}
void
UnixTransport::send(const Block& header, const Block& payload)
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->send(header, payload);
}
void
UnixTransport::close()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->close();
m_impl.reset();
}
@@ -111,7 +109,7 @@
void
UnixTransport::pause()
{
- if (static_cast<bool>(m_impl)) {
+ if (m_impl != nullptr) {
m_impl->pause();
}
}
@@ -119,7 +117,7 @@
void
UnixTransport::resume()
{
- BOOST_ASSERT(static_cast<bool>(m_impl));
+ BOOST_ASSERT(m_impl != nullptr);
m_impl->resume();
}
diff --git a/src/transport/unix-transport.hpp b/src/transport/unix-transport.hpp
index 6d16c26..99d8688 100644
--- a/src/transport/unix-transport.hpp
+++ b/src/transport/unix-transport.hpp
@@ -22,63 +22,58 @@
#ifndef NDN_TRANSPORT_UNIX_TRANSPORT_HPP
#define NDN_TRANSPORT_UNIX_TRANSPORT_HPP
-#include "../common.hpp"
#include "transport.hpp"
#include "../util/config-file.hpp"
-// forward declaration
-namespace boost { namespace asio { namespace local { class stream_protocol; } } }
+namespace boost {
+namespace asio {
+namespace local {
+class stream_protocol;
+} // namespace local
+} // namespace asio
+} // namespace boost
namespace ndn {
-// forward declaration
-template<class T, class U>
+template<typename BaseTransport, typename Protocol>
class StreamTransportImpl;
+/** \brief a transport using Unix stream socket
+ */
class UnixTransport : public Transport
{
public:
-
- /**
- * Create Unix transport based on the socket specified
- * in a well-known configuration file or fallback to /var/run/nfd.sock
- *
- * @throws Throws UnixTransport::Error on failure to parse a discovered configuration file
- */
+ explicit
UnixTransport(const std::string& unixSocket);
~UnixTransport();
- // from Transport
virtual void
connect(boost::asio::io_service& ioService,
- const ReceiveCallback& receiveCallback);
+ const ReceiveCallback& receiveCallback) override;
virtual void
- close();
+ close() override;
virtual void
- pause();
+ pause() override;
virtual void
- resume();
+ resume() override;
virtual void
- send(const Block& wire);
+ send(const Block& wire) override;
virtual void
- send(const Block& header, const Block& payload);
+ send(const Block& header, const Block& payload) override;
- /**
- * @brief Create transport with parameters defined in URI
- *
- * @throws Transport::Error if incorrect URI or unsupported protocol is specified
+ /** \brief Create transport with parameters defined in URI
+ * \throw Transport::Error if incorrect URI or unsupported protocol is specified
*/
static shared_ptr<UnixTransport>
create(const std::string& uri);
NDN_CXX_PUBLIC_WITH_TESTS_ELSE_PRIVATE:
-
static std::string
getSocketNameFromUri(const std::string& uri);
@@ -87,7 +82,7 @@
typedef StreamTransportImpl<UnixTransport, boost::asio::local::stream_protocol> Impl;
friend class StreamTransportImpl<UnixTransport, boost::asio::local::stream_protocol>;
- shared_ptr< Impl > m_impl;
+ shared_ptr<Impl> m_impl;
};
} // namespace ndn