face: TcpTransport
Change-Id: I3ff898225ad1b0c1178490f389048944e24a9f1b
Refs: #3166
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
deleted file mode 100644
index 5580fe4..0000000
--- a/daemon/face/stream-face.hpp
+++ /dev/null
@@ -1,363 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2015, Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology,
- * The University of Memphis.
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef NFD_DAEMON_FACE_STREAM_FACE_HPP
-#define NFD_DAEMON_FACE_STREAM_FACE_HPP
-
-#include "face.hpp"
-#include "local-face.hpp"
-#include "core/global-io.hpp"
-
-#include <queue>
-
-namespace nfd {
-
-// forward declaration
-template<class T, class U, class V> struct StreamFaceSenderImpl;
-
-template<class Protocol, class FaceBase = Face>
-class StreamFace : public FaceBase
-{
-public:
- typedef Protocol protocol;
-
- StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
- typename protocol::socket socket, bool isOnDemand);
-
- // from FaceBase
- void
- sendInterest(const Interest& interest) DECL_OVERRIDE;
-
- void
- sendData(const Data& data) DECL_OVERRIDE;
-
- void
- close() DECL_OVERRIDE;
-
-protected:
- void
- processErrorCode(const boost::system::error_code& error);
-
- void
- sendFromQueue();
-
- void
- handleSend(const boost::system::error_code& error,
- size_t nBytesSent);
-
- void
- handleReceive(const boost::system::error_code& error,
- size_t nBytesReceived);
-
- void
- shutdownSocket();
-
- void
- deferredClose(const shared_ptr<Face>& face);
-
-protected:
- typename protocol::socket m_socket;
-
- NFD_LOG_INCLASS_DECLARE();
-
-private:
- uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
- size_t m_inputBufferSize;
- std::queue<Block> m_sendQueue;
-
- friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
- friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
-};
-
-// All inherited classes must use
-// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
-
-
-/** \brief Class allowing validation of the StreamFace use
- *
- * For example, partial specialization based on boost::asio::ip::tcp should check
- * that local endpoint is loopback
- *
- * @throws Face::Error if validation failed
- */
-template<class Protocol, class U>
-struct StreamFaceValidator
-{
- static void
- validateSocket(const typename Protocol::socket& socket)
- {
- }
-};
-
-
-template<class T, class FaceBase>
-inline
-StreamFace<T, FaceBase>::StreamFace(const FaceUri& remoteUri, const FaceUri& localUri,
- typename StreamFace::protocol::socket socket, bool isOnDemand)
- : FaceBase(remoteUri, localUri)
- , m_socket(std::move(socket))
- , m_inputBufferSize(0)
-{
- NFD_LOG_FACE_INFO("Creating face");
-
- this->setPersistency(isOnDemand ? ndn::nfd::FACE_PERSISTENCY_ON_DEMAND : ndn::nfd::FACE_PERSISTENCY_PERSISTENT);
- StreamFaceValidator<T, FaceBase>::validateSocket(m_socket);
-
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE),
- bind(&StreamFace<T, FaceBase>::handleReceive, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-
-template<class Protocol, class FaceBase, class Packet>
-struct StreamFaceSenderImpl
-{
- static void
- send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
- {
- bool wasQueueEmpty = face.m_sendQueue.empty();
- face.m_sendQueue.push(packet.wireEncode());
-
- if (wasQueueEmpty)
- face.sendFromQueue();
- }
-};
-
-// partial specialization (only classes can be partially specialized)
-template<class Protocol, class Packet>
-struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
-{
- static void
- send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
- {
- bool wasQueueEmpty = face.m_sendQueue.empty();
-
- if (!face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
- {
- face.m_sendQueue.push(face.filterAndEncodeLocalControlHeader(packet));
- }
- face.m_sendQueue.push(packet.wireEncode());
-
- if (wasQueueEmpty)
- face.sendFromQueue();
- }
-};
-
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::sendInterest(const Interest& interest)
-{
- NFD_LOG_FACE_TRACE(__func__);
- this->emitSignal(onSendInterest, interest);
- StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::sendData(const Data& data)
-{
- NFD_LOG_FACE_TRACE(__func__);
- this->emitSignal(onSendData, data);
- StreamFaceSenderImpl<T, U, Data>::send(*this, data);
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::close()
-{
- if (!m_socket.is_open())
- return;
-
- NFD_LOG_FACE_INFO("Closing face");
-
- shutdownSocket();
- this->fail("Face closed");
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
-{
- if (error == boost::asio::error::operation_aborted || // when cancel() is called
- error == boost::asio::error::shut_down) // after shutdown() is called
- return;
-
- if (!m_socket.is_open())
- {
- this->fail("Connection closed");
- return;
- }
-
- if (error != boost::asio::error::eof)
- NFD_LOG_FACE_WARN("Send or receive operation failed: " << error.message());
-
- shutdownSocket();
-
- if (error == boost::asio::error::eof)
- this->fail("Connection closed");
- else
- this->fail(error.message());
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::sendFromQueue()
-{
- boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
- bind(&StreamFace<T, U>::handleSend, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::handleSend(const boost::system::error_code& error,
- size_t nBytesSent)
-{
- if (error)
- return processErrorCode(error);
-
- BOOST_ASSERT(!m_sendQueue.empty());
-
- NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
- this->getMutableCounters().getNOutBytes() += nBytesSent;
-
- m_sendQueue.pop();
- if (!m_sendQueue.empty())
- sendFromQueue();
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
- size_t nBytesReceived)
-{
- if (error)
- return processErrorCode(error);
-
- NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
- this->getMutableCounters().getNInBytes() += nBytesReceived;
-
- m_inputBufferSize += nBytesReceived;
-
- size_t offset = 0;
-
- bool isOk = true;
- Block element;
- while (m_inputBufferSize - offset > 0) {
- std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
- if (!isOk)
- break;
-
- offset += element.size();
-
- BOOST_ASSERT(offset <= m_inputBufferSize);
-
- if (!this->decodeAndDispatchInput(element)) {
- NFD_LOG_FACE_WARN("Received unrecognized TLV block of type " << element.type());
- // ignore unknown packet and proceed
- }
- }
-
- if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0)
- {
- NFD_LOG_FACE_WARN("Failed to parse incoming packet or packet too large to process");
- shutdownSocket();
- this->fail("Failed to parse incoming packet or packet too large to process");
- return;
- }
-
- if (offset > 0)
- {
- if (offset != m_inputBufferSize)
- {
- std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
- m_inputBuffer);
- m_inputBufferSize -= offset;
- }
- else
- {
- m_inputBufferSize = 0;
- }
- }
-
- m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
- ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize),
- bind(&StreamFace<T, U>::handleReceive, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::shutdownSocket()
-{
- NFD_LOG_FACE_TRACE(__func__);
-
- // Cancel all outstanding operations and shutdown the socket
- // so that no further sends or receives are possible.
- // Use the non-throwing variants and ignore errors, if any.
- boost::system::error_code error;
- m_socket.cancel(error);
- m_socket.shutdown(protocol::socket::shutdown_both, error);
-
- // ensure that the Face object is alive at least until all pending
- // handlers are dispatched
- getGlobalIoService().post(bind(&StreamFace<T, U>::deferredClose,
- this, this->shared_from_this()));
-
- // Some bug or feature of Boost.Asio (see http://redmine.named-data.net/issues/1856):
- //
- // When shutdownSocket is called from within a socket event (e.g., from handleReceive),
- // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
- // Instead, handleSend is invoked as nothing bad happened.
- //
- // In order to prevent the assertion in handleSend from failing, we clear the queue
- // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
- // this point have been executed. If more send operations are scheduled after this
- // point, they will fail because the socket has been shutdown, and their callbacks
- // will be invoked with error code == asio::error::shut_down.
-}
-
-template<class T, class U>
-inline void
-StreamFace<T, U>::deferredClose(const shared_ptr<Face>& face)
-{
- NFD_LOG_FACE_TRACE(__func__);
-
- // clear send queue
- std::queue<Block> emptyQueue;
- std::swap(emptyQueue, m_sendQueue);
-
- // use the non-throwing variant and ignore errors, if any
- boost::system::error_code error;
- m_socket.close(error);
-}
-
-} // namespace nfd
-
-#endif // NFD_DAEMON_FACE_STREAM_FACE_HPP
diff --git a/daemon/face/tcp-channel.cpp b/daemon/face/tcp-channel.cpp
index b46e92c..2549c66 100644
--- a/daemon/face/tcp-channel.cpp
+++ b/daemon/face/tcp-channel.cpp
@@ -24,14 +24,15 @@
*/
#include "tcp-channel.hpp"
-#include "tcp-face.hpp"
+#include "generic-link-service.hpp"
+#include "tcp-transport.hpp"
#include "core/global-io.hpp"
namespace nfd {
NFD_LOG_INIT("TcpChannel");
-using namespace boost::asio;
+namespace ip = boost::asio::ip;
TcpChannel::TcpChannel(const tcp::Endpoint& localEndpoint)
: m_localEndpoint(localEndpoint)
@@ -94,24 +95,21 @@
}
void
-TcpChannel::createFace(ip::tcp::socket socket,
+TcpChannel::createFace(ip::tcp::socket&& socket,
const FaceCreatedCallback& onFaceCreated,
bool isOnDemand)
{
- shared_ptr<Face> face;
+ shared_ptr<face::LpFaceWrapper> face;
tcp::Endpoint remoteEndpoint = socket.remote_endpoint();
auto it = m_channelFaces.find(remoteEndpoint);
if (it == m_channelFaces.end()) {
- tcp::Endpoint localEndpoint = socket.local_endpoint();
-
- if (localEndpoint.address().is_loopback() &&
- remoteEndpoint.address().is_loopback())
- face = make_shared<TcpLocalFace>(FaceUri(remoteEndpoint), FaceUri(localEndpoint),
- std::move(socket), isOnDemand);
- else
- face = make_shared<TcpFace>(FaceUri(remoteEndpoint), FaceUri(localEndpoint),
- std::move(socket), isOnDemand);
+ auto persistency = isOnDemand ? ndn::nfd::FACE_PERSISTENCY_ON_DEMAND
+ : ndn::nfd::FACE_PERSISTENCY_PERSISTENT;
+ auto linkService = make_unique<face::GenericLinkService>();
+ auto transport = make_unique<face::TcpTransport>(std::move(socket), persistency);
+ auto lpFace = make_unique<face::LpFace>(std::move(linkService), std::move(transport));
+ face = make_shared<face::LpFaceWrapper>(std::move(lpFace));
face->onFail.connectSingleShot([this, remoteEndpoint] (const std::string&) {
NFD_LOG_TRACE("Erasing " << remoteEndpoint << " from channel face map");
diff --git a/daemon/face/tcp-channel.hpp b/daemon/face/tcp-channel.hpp
index 5b5505a..7d9994a 100644
--- a/daemon/face/tcp-channel.hpp
+++ b/daemon/face/tcp-channel.hpp
@@ -27,6 +27,7 @@
#define NFD_DAEMON_FACE_TCP_CHANNEL_HPP
#include "channel.hpp"
+#include "lp-face-wrapper.hpp"
#include "core/scheduler.hpp"
namespace nfd {
@@ -88,7 +89,7 @@
private:
void
- createFace(boost::asio::ip::tcp::socket socket,
+ createFace(boost::asio::ip::tcp::socket&& socket,
const FaceCreatedCallback& onFaceCreated,
bool isOnDemand);
@@ -113,7 +114,7 @@
const ConnectFailedCallback& onConnectFailed);
private:
- std::map<tcp::Endpoint, shared_ptr<Face>> m_channelFaces;
+ std::map<tcp::Endpoint, shared_ptr<face::LpFaceWrapper>> m_channelFaces;
tcp::Endpoint m_localEndpoint;
boost::asio::ip::tcp::acceptor m_acceptor;
diff --git a/daemon/face/tcp-face.hpp b/daemon/face/tcp-face.hpp
deleted file mode 100644
index 4313c06..0000000
--- a/daemon/face/tcp-face.hpp
+++ /dev/null
@@ -1,80 +0,0 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014 Regents of the University of California,
- * Arizona Board of Regents,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University,
- * Washington University in St. Louis,
- * Beijing Institute of Technology
- *
- * This file is part of NFD (Named Data Networking Forwarding Daemon).
- * See AUTHORS.md for complete list of NFD authors and contributors.
- *
- * NFD is free software: you can redistribute it and/or modify it under the terms
- * of the GNU General Public License as published by the Free Software Foundation,
- * either version 3 of the License, or (at your option) any later version.
- *
- * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
- * PURPOSE. See the GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
- **/
-
-#ifndef NFD_DAEMON_FACE_TCP_FACE_HPP
-#define NFD_DAEMON_FACE_TCP_FACE_HPP
-
-#include "stream-face.hpp"
-
-namespace nfd {
-
-/**
- * \brief Implementation of Face abstraction that uses TCP
- * as underlying transport mechanism
- */
-class TcpFace : public StreamFace<boost::asio::ip::tcp>
-{
-public:
- TcpFace(const FaceUri& remoteUri, const FaceUri& localUri,
- protocol::socket socket, bool isOnDemand);
-};
-
-
-/**
- * \brief Implementation of Face abstraction that uses TCP
- * as underlying transport mechanism and is used for
- * local communication (can enable LocalControlHeader)
- */
-class TcpLocalFace : public StreamFace<boost::asio::ip::tcp, LocalFace>
-{
-public:
- TcpLocalFace(const FaceUri& remoteUri, const FaceUri& localUri,
- protocol::socket socket, bool isOnDemand);
-};
-
-
-/** \brief Class validating use of TcpLocalFace
- */
-template<>
-struct StreamFaceValidator<TcpLocalFace::protocol, LocalFace>
-{
- /** Check that local endpoint is loopback
- *
- * @throws Face::Error if validation failed
- */
- static void
- validateSocket(const TcpLocalFace::protocol::socket& socket)
- {
- if (!socket.local_endpoint().address().is_loopback() ||
- !socket.remote_endpoint().address().is_loopback())
- {
- BOOST_THROW_EXCEPTION(Face::Error("TcpLocalFace can be created only on a loopback "
- "address"));
- }
- }
-};
-
-} // namespace nfd
-
-#endif // NFD_DAEMON_FACE_TCP_FACE_HPP
diff --git a/daemon/face/tcp-transport.cpp b/daemon/face/tcp-transport.cpp
new file mode 100644
index 0000000..4af12f9
--- /dev/null
+++ b/daemon/face/tcp-transport.cpp
@@ -0,0 +1,63 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2015, Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology,
+ * The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "tcp-transport.hpp"
+
+namespace nfd {
+namespace face {
+
+NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamTransport, TcpTransport::protocol,
+ "TcpTransport");
+
+TcpTransport::TcpTransport(protocol::socket&& socket, ndn::nfd::FacePersistency persistency)
+ : StreamTransport(std::move(socket))
+{
+ this->setLocalUri(FaceUri(m_socket.local_endpoint()));
+ this->setRemoteUri(FaceUri(m_socket.remote_endpoint()));
+
+ if (m_socket.local_endpoint().address().is_loopback() &&
+ m_socket.remote_endpoint().address().is_loopback())
+ this->setScope(ndn::nfd::FACE_SCOPE_LOCAL);
+ else
+ this->setScope(ndn::nfd::FACE_SCOPE_NON_LOCAL);
+
+ this->setPersistency(persistency);
+ this->setLinkType(ndn::nfd::LINK_TYPE_POINT_TO_POINT);
+ this->setMtu(MTU_UNLIMITED);
+
+ NFD_LOG_FACE_INFO("Creating transport");
+}
+
+void
+TcpTransport::beforeChangePersistency(ndn::nfd::FacePersistency newPersistency)
+{
+ if (newPersistency == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
+ BOOST_THROW_EXCEPTION(
+ std::invalid_argument("TcpTransport does not support FACE_PERSISTENCY_PERMANENT"));
+ }
+}
+
+} // namespace face
+} // namespace nfd
diff --git a/daemon/face/tcp-face.cpp b/daemon/face/tcp-transport.hpp
similarity index 62%
rename from daemon/face/tcp-face.cpp
rename to daemon/face/tcp-transport.hpp
index 39ebe04..654f224 100644
--- a/daemon/face/tcp-face.cpp
+++ b/daemon/face/tcp-transport.hpp
@@ -22,26 +22,29 @@
* NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
**/
-#include "tcp-face.hpp"
+#ifndef NFD_DAEMON_FACE_TCP_TRANSPORT_HPP
+#define NFD_DAEMON_FACE_TCP_TRANSPORT_HPP
+
+#include "stream-transport.hpp"
namespace nfd {
+namespace face {
-NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, TcpFace::protocol, "TcpFace");
-
-NFD_LOG_INCLASS_2TEMPLATE_SPECIALIZATION_DEFINE(StreamFace,
- TcpLocalFace::protocol, LocalFace,
- "TcpLocalFace");
-
-TcpFace::TcpFace(const FaceUri& remoteUri, const FaceUri& localUri,
- protocol::socket socket, bool isOnDemand)
- : StreamFace<protocol>(remoteUri, localUri, std::move(socket), isOnDemand)
+/**
+ * \brief A Transport that communicates on a connected TCP socket
+ */
+class TcpTransport : public StreamTransport<boost::asio::ip::tcp>
{
-}
+public:
+ TcpTransport(protocol::socket&& socket,
+ ndn::nfd::FacePersistency persistency);
-TcpLocalFace::TcpLocalFace(const FaceUri& remoteUri, const FaceUri& localUri,
- protocol::socket socket, bool isOnDemand)
- : StreamFace<protocol, LocalFace>(remoteUri, localUri, std::move(socket), isOnDemand)
-{
-}
+protected:
+ virtual void
+ beforeChangePersistency(ndn::nfd::FacePersistency newPersistency) DECL_OVERRIDE;
+};
+} // namespace face
} // namespace nfd
+
+#endif // NFD_DAEMON_FACE_TCP_TRANSPORT_HPP
diff --git a/daemon/face/unix-stream-transport.cpp b/daemon/face/unix-stream-transport.cpp
index e9e2eab..0e37a42 100644
--- a/daemon/face/unix-stream-transport.cpp
+++ b/daemon/face/unix-stream-transport.cpp
@@ -36,7 +36,7 @@
{
static_assert(
std::is_same<std::remove_cv<protocol::socket::native_handle_type>::type, int>::value,
- "The native handle type for UnixStreamFace sockets must be 'int'"
+ "The native handle type for UnixStreamTransport sockets must be 'int'"
);
this->setLocalUri(FaceUri(m_socket.local_endpoint()));
@@ -46,7 +46,7 @@
this->setLinkType(ndn::nfd::LINK_TYPE_POINT_TO_POINT);
this->setMtu(MTU_UNLIMITED);
- NFD_LOG_FACE_INFO("Creating Transport");
+ NFD_LOG_FACE_INFO("Creating transport");
}
} // namespace face