face: Implementation of encode/decode of LocalControlHeader
LocalControlHeader can only be used on faces that are derived from
LocalFace. UnixStreamFace is directly inherited from LocalFace,
TCP face has two specializations: generic TcpFace (strictly not local),
and LocalTcpFace.
refs #1213
Change-Id: I8a158c3bc4bb929eedd15757cfddecc0d1049f9f
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index ac08421..9baef1d 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -8,14 +8,18 @@
#define NFD_FACE_STREAM_FACE_HPP
#include "face.hpp"
+#include "local-face.hpp"
namespace nfd {
-template <class T>
-class StreamFace : public Face
+// forward declaration
+template<class T, class U, class V> struct StreamFaceSenderImpl;
+
+template<class Protocol, class FaceBase = Face>
+class StreamFace : public FaceBase
{
public:
- typedef T protocol;
+ typedef Protocol protocol;
/**
* \brief Create instance of StreamFace
@@ -37,6 +41,12 @@
protected:
void
+ processErrorCode(const boost::system::error_code& error);
+
+ void
+ handleSend(const boost::system::error_code& error,
+ const Block& header, const Block& payload);
+ void
handleSend(const boost::system::error_code& error,
const Block& wire);
@@ -56,159 +66,201 @@
private:
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
std::size_t m_inputBufferSize;
-
+
+ friend struct StreamFaceSenderImpl<Protocol, FaceBase, Interest>;
+ friend struct StreamFaceSenderImpl<Protocol, FaceBase, Data>;
+
NFD_LOG_INCLASS_DECLARE();
};
// All inherited classes must use
// NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, <specialization-parameter>, "Name");
-template <class T>
+
+/** \brief Class allowing validation of the StreamFace use
+ *
+ * For example, partial specialization based on boost::asio::ip::tcp should check
+ * that local endpoint is loopback
+ *
+ * @throws Face::Error if validation failed
+ */
+template<class Protocol, class U>
+struct StreamFaceValidator
+{
+ static void
+ validateSocket(typename Protocol::socket& socket)
+ {
+ }
+};
+
+
+template<class T, class U>
inline
-StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
+StreamFace<T, U>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
: m_socket(socket)
, m_inputBufferSize(0)
{
+ StreamFaceValidator<T, U>::validateSocket(*socket);
m_socket->async_receive(boost::asio::buffer(m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&StreamFace<T>::handleReceive, this, _1, _2));
+ bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
}
-template <class T>
+template<class T, class U>
inline
-StreamFace<T>::~StreamFace()
+StreamFace<T, U>::~StreamFace()
{
}
-
-template <class T>
-inline void
-StreamFace<T>::sendInterest(const Interest& interest)
+template<class Protocol, class FaceBase, class Packet>
+struct StreamFaceSenderImpl
{
- m_socket->async_send(boost::asio::buffer(interest.wireEncode().wire(),
- interest.wireEncode().size()),
- bind(&StreamFace<T>::handleSend, this, _1, interest.wireEncode()));
-
- // anything else should be done here?
+ static void
+ send(StreamFace<Protocol, FaceBase>& face, const Packet& packet)
+ {
+ face.m_socket->async_send(boost::asio::buffer(packet.wireEncode().wire(),
+ packet.wireEncode().size()),
+ bind(&StreamFace<Protocol, FaceBase>::handleSend,
+ &face, _1, packet.wireEncode()));
+ }
+};
+
+// partial specialization (only classes can be partially specialized)
+template<class Protocol, class Packet>
+struct StreamFaceSenderImpl<Protocol, LocalFace, Packet>
+{
+ static void
+ send(StreamFace<Protocol, LocalFace>& face, const Packet& packet)
+ {
+ using namespace boost::asio;
+
+ if (face.isEmptyFilteredLocalControlHeader(packet.getLocalControlHeader()))
+ {
+ const Block& payload = packet.wireEncode();
+ face.m_socket->async_send(buffer(payload.wire(), payload.size()),
+ bind(&StreamFace<Protocol, LocalFace>::handleSend,
+ &face, _1, packet.wireEncode()));
+ }
+ else
+ {
+ Block header = face.filterAndEncodeLocalControlHeader(packet);
+ const Block& payload = packet.wireEncode();
+
+ std::vector<const_buffer> buffers;
+ buffers.reserve(2);
+ buffers.push_back(buffer(header.wire(), header.size()));
+ buffers.push_back(buffer(payload.wire(), payload.size()));
+
+ face.m_socket->async_send(buffers,
+ bind(&StreamFace<Protocol, LocalFace>::handleSend,
+ &face, _1, header, payload));
+ }
+ }
+};
+
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::sendInterest(const Interest& interest)
+{
+ StreamFaceSenderImpl<T, U, Interest>::send(*this, interest);
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::sendData(const Data& data)
+StreamFace<T, U>::sendData(const Data& data)
{
- m_socket->async_send(boost::asio::buffer(data.wireEncode().wire(),
- data.wireEncode().size()),
- bind(&StreamFace<T>::handleSend, this, _1, data.wireEncode()));
-
- // anything else should be done here?
+ StreamFaceSenderImpl<T, U, Data>::send(*this, data);
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::close()
+StreamFace<T, U>::close()
{
if (!m_socket->is_open())
return;
-
+
NFD_LOG_INFO("[id:" << this->getId()
<< ",endpoint:" << m_socket->local_endpoint()
<< "] Close connection");
closeSocket();
- onFail("Close connection");
+ this->onFail("Close connection");
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::handleSend(const boost::system::error_code& error,
- const Block& wire)
+StreamFace<T, U>::processErrorCode(const boost::system::error_code& error)
{
- if (error) {
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- return;
-
- if (!m_socket->is_open())
- {
- onFail("Connection closed");
- return;
- }
-
- if (error == boost::asio::error::eof)
- {
- NFD_LOG_INFO("[id:" << this->getId()
- << ",endpoint:" << m_socket->local_endpoint()
- << "] Connection closed");
- }
- else
- {
- NFD_LOG_WARN("[id:" << this->getId()
- << ",endpoint:" << m_socket->local_endpoint()
- << "] Send operation failed, closing socket: "
- << error.category().message(error.value()));
- }
-
- closeSocket();
-
- if (error == boost::asio::error::eof)
- {
- onFail("Connection closed");
- }
- else
- {
- onFail("Send operation failed, closing socket: " +
- error.category().message(error.value()));
- }
+ if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- }
+
+ if (!m_socket->is_open())
+ {
+ this->onFail("Connection closed");
+ return;
+ }
+
+ if (error == boost::asio::error::eof)
+ {
+ NFD_LOG_INFO("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Connection closed");
+ }
+ else
+ {
+ NFD_LOG_WARN("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Send or receive operation failed, closing socket: "
+ << error.category().message(error.value()));
+ }
+
+ closeSocket();
+
+ if (error == boost::asio::error::eof)
+ {
+ this->onFail("Connection closed");
+ }
+ else
+ {
+ this->onFail("Send or receive operation failed, closing socket: " +
+ error.category().message(error.value()));
+ }
+}
+
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::handleSend(const boost::system::error_code& error,
+ const Block& wire)
+{
+ if (error)
+ return processErrorCode(error);
NFD_LOG_TRACE("[id:" << this->getId()
<< ",endpoint:" << m_socket->local_endpoint()
<< "] Successfully sent: " << wire.size() << " bytes");
- // do nothing (needed to retain validity of wire memory block
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::handleReceive(const boost::system::error_code& error,
+StreamFace<T, U>::handleSend(const boost::system::error_code& error,
+ const Block& header, const Block& payload)
+{
+ if (error)
+ return processErrorCode(error);
+
+ NFD_LOG_TRACE("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Successfully sent: " << (header.size()+payload.size()) << " bytes");
+}
+
+template<class T, class U>
+inline void
+StreamFace<T, U>::handleReceive(const boost::system::error_code& error,
std::size_t bytes_recvd)
{
- if (error || bytes_recvd == 0) {
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- return;
-
- // this should be unnecessary, but just in case
- if (!m_socket->is_open())
- {
- onFail("Connection closed");
- return;
- }
-
- if (error == boost::asio::error::eof)
- {
- NFD_LOG_INFO("[id:" << this->getId()
- << ",endpoint:" << m_socket->local_endpoint()
- << "] Connection closed");
- }
- else
- {
- NFD_LOG_WARN("[id:" << this->getId()
- << ",endpoint:" << m_socket->local_endpoint()
- << "] Receive operation failed: "
- << error.category().message(error.value()));
- }
-
- closeSocket();
-
- if (error == boost::asio::error::eof)
- {
- onFail("Connection closed");
- }
- else
- {
- onFail("Receive operation failed, closing socket: " +
- error.category().message(error.value()));
- }
- return;
- }
+ if (error)
+ return processErrorCode(error);
NFD_LOG_TRACE("[id:" << this->getId()
<< ",endpoint:" << m_socket->local_endpoint()
@@ -226,27 +278,8 @@
offset += element.size();
BOOST_ASSERT(offset <= m_inputBufferSize);
-
- /// @todo Ensure lazy field decoding process
- if (element.type() == tlv::Interest)
- {
- shared_ptr<Interest> i = make_shared<Interest>();
- i->wireDecode(element);
- onReceiveInterest(*i);
- }
- else if (element.type() == tlv::Data)
- {
- shared_ptr<Data> d = make_shared<Data>();
- d->wireDecode(element);
- onReceiveData(*d);
- }
- // @todo Add local header support
- // else if (element.type() == tlv::LocalHeader)
- // {
- // shared_ptr<Interest> i = make_shared<Interest>();
- // i->wireDecode(element);
- // }
- else
+
+ if (!this->decodeAndDispatchInput(element))
{
NFD_LOG_WARN("[id:" << this->getId()
<< ",endpoint:" << m_socket->local_endpoint()
@@ -265,7 +298,7 @@
<< "closing down the face");
closeSocket();
- onFail("Received input is invalid or too large to process, closing down the face");
+ this->onFail("Received input is invalid or too large to process, closing down the face");
return;
}
}
@@ -286,18 +319,18 @@
m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
- bind(&StreamFace<T>::handleReceive, this, _1, _2));
+ bind(&StreamFace<T, U>::handleReceive, this, _1, _2));
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
+StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted(const shared_ptr<Face>& face)
{
}
-template <class T>
+template<class T, class U>
inline void
-StreamFace<T>::closeSocket()
+StreamFace<T, U>::closeSocket()
{
boost::asio::io_service& io = m_socket->get_io_service();
@@ -309,7 +342,7 @@
// ensure that the Face object is alive at least until all pending
// handlers are dispatched
- io.post(bind(&StreamFace<T>::keepFaceAliveUntilAllHandlersExecuted,
+ io.post(bind(&StreamFace<T, U>::keepFaceAliveUntilAllHandlersExecuted,
this, this->shared_from_this()));
}