face/tcp: Fixing TcpChannel and TcpFace implementation and enable basic logging in both
Change-Id: If9a00c13caccbeb117204655ced293d49860d6e4
refs: #1134, #1133
diff --git a/daemon/face/stream-face.hpp b/daemon/face/stream-face.hpp
index a764a07..529e3ef 100644
--- a/daemon/face/stream-face.hpp
+++ b/daemon/face/stream-face.hpp
@@ -34,8 +34,12 @@
private:
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
std::size_t m_inputBufferSize;
+
+ NFD_LOG_INCLASS_DECLARE();
};
+NFD_LOG_INCLASS_TEMPLATE_DEFINE(StreamFace, "StreamFace");
+
template <class T>
inline
StreamFace<T>::StreamFace(const shared_ptr<typename StreamFace::protocol::socket>& socket)
@@ -55,11 +59,20 @@
if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- onFail("Send operation failed: " + error.category().message(error.value()));
+ NFD_LOG_WARN("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Send operation failed, closing socket: "
+ << error.category().message(error.value()));
+
+ onFail("Send operation failed, closing socket: " +
+ error.category().message(error.value()));
m_socket->close();
return;
}
+ NFD_LOG_TRACE("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Successfully sent: " << wire.size() << " bytes");
// do nothing (needed to retain validity of wire memory block
}
@@ -68,11 +81,21 @@
StreamFace<T>::handleReceive(const boost::system::error_code& error,
std::size_t bytes_recvd)
{
+ NFD_LOG_TRACE("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Received: " << bytes_recvd << " bytes");
+
if (error || bytes_recvd == 0) {
if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- onFail("Receive operation failed: " + error.category().message(error.value()));
+ NFD_LOG_WARN("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Receive operation failed: "
+ << error.category().message(error.value()));
+
+ onFail("Receive operation failed, closing socket: " +
+ error.category().message(error.value()));
m_socket->close();
return;
}
@@ -83,38 +106,50 @@
std::size_t offset = 0;
/// @todo Eliminate reliance on exceptions in this path
try {
- Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
- offset += element.size();
+ while(m_inputBufferSize - offset > 0)
+ {
+ Block element(m_inputBuffer + offset, m_inputBufferSize - offset);
+ offset += element.size();
- /// @todo Ensure lazy field decoding process
- if (element.type() == tlv::Interest)
- {
- shared_ptr<Interest> i = make_shared<Interest>();
- i->wireDecode(element);
- onReceiveInterest(*i);
- }
- else if (element.type() == tlv::Data)
- {
- shared_ptr<Data> d = make_shared<Data>();
- d->wireDecode(element);
- onReceiveData(*d);
- }
- // @todo Add local header support
- // else if (element.type() == tlv::LocalHeader)
- // {
- // shared_ptr<Interest> i = make_shared<Interest>();
- // i->wireDecode(element);
- // }
- else
- {
- /// @todo Add loggin
-
- // ignore unknown packet and proceed
+ BOOST_ASSERT(offset <= m_inputBufferSize);
+
+ /// @todo Ensure lazy field decoding process
+ if (element.type() == tlv::Interest)
+ {
+ shared_ptr<Interest> i = make_shared<Interest>();
+ i->wireDecode(element);
+ onReceiveInterest(*i);
+ }
+ else if (element.type() == tlv::Data)
+ {
+ shared_ptr<Data> d = make_shared<Data>();
+ d->wireDecode(element);
+ onReceiveData(*d);
+ }
+ // @todo Add local header support
+ // else if (element.type() == tlv::LocalHeader)
+ // {
+ // shared_ptr<Interest> i = make_shared<Interest>();
+ // i->wireDecode(element);
+ // }
+ else
+ {
+ NFD_LOG_WARN("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Received unrecognized block of type ["
+ << element.type() << "]");
+ // ignore unknown packet and proceed
+ }
}
}
- catch(const tlv::Error&) {
+ catch(const tlv::Error& e) {
if (m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
{
+ NFD_LOG_WARN("[id:" << this->getId()
+ << ",endpoint:" << m_socket->local_endpoint()
+ << "] Received input is invalid or too large to process, "
+ << "closing down the face");
+
onFail("Received input is invalid or too large to process, closing down the face");
m_socket->close();
return;
diff --git a/daemon/face/tcp-channel.cpp b/daemon/face/tcp-channel.cpp
index f67e9b2..e3ad8d5 100644
--- a/daemon/face/tcp-channel.cpp
+++ b/daemon/face/tcp-channel.cpp
@@ -8,6 +8,8 @@
namespace nfd {
+NFD_LOG_INIT("TcpChannel");
+
using namespace boost::asio;
TcpChannel::TcpChannel(io_service& ioService,
@@ -33,7 +35,8 @@
m_acceptor->async_accept(*clientSocket,
bind(&TcpChannel::handleConnection, this, _1,
clientSocket,
- onFaceCreated, onAcceptFailed));
+ onFaceCreated, onAcceptFailed,
+ true));
}
void
@@ -54,9 +57,13 @@
shared_ptr<monotonic_deadline_timer> connectTimeoutTimer =
make_shared<monotonic_deadline_timer>(boost::ref(m_ioService));
- // not sure if it works. This will bind to something...
- // Do we need reuse here too?
- clientSocket->bind(m_localEndpoint);
+ clientSocket->open(m_localEndpoint.protocol());
+
+ // The following does not work and CCNx does not bind the local
+ // socket to a fixed port number for TCP connections
+
+ // clientSocket->set_option(ip::tcp::socket::reuse_address(true));
+ // clientSocket->bind(m_localEndpoint);
clientSocket->async_connect(remoteEndpoint,
bind(&TcpChannel::handleSuccessfulConnect, this, _1,
@@ -81,9 +88,13 @@
shared_ptr<monotonic_deadline_timer> connectTimeoutTimer =
make_shared<monotonic_deadline_timer>(boost::ref(m_ioService));
- // not sure if it works. This will bind to something...
- // Do we need reuse here too?
- clientSocket->bind(m_localEndpoint);
+ clientSocket->open(m_localEndpoint.protocol());
+
+ // The following does not work and CCNx does not bind the local
+ // socket to a fixed port number for TCP connections
+
+ // clientSocket->set_option(ip::tcp::socket::reuse_address(true));
+ // clientSocket->bind(m_localEndpoint);
ip::tcp::resolver::query query(remoteHost, remotePort);
shared_ptr<ip::tcp::resolver> resolver =
@@ -92,7 +103,8 @@
resolver->async_resolve(query,
bind(&TcpChannel::handleEndpointResoution, this, _1, _2,
clientSocket, connectTimeoutTimer,
- onFaceCreated, onConnectFailed));
+ onFaceCreated, onConnectFailed,
+ resolver));
connectTimeoutTimer->expires_from_now(timeout);
connectTimeoutTimer->async_wait(bind(&TcpChannel::handleFailedConnect, this, _1,
@@ -105,18 +117,35 @@
TcpChannel::handleConnection(const boost::system::error_code& error,
const shared_ptr<ip::tcp::socket>& socket,
const FaceCreatedCallback& onFaceCreated,
- const ConnectFailedCallback& onConnectFailed)
+ const ConnectFailedCallback& onConnectFailed,
+ bool remoteConnection)
{
if (error) {
if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- /// \todo Log the error
+ NFD_LOG_DEBUG("Connect to remote endpoint failed: "
+ << error.category().message(error.value()));
+
onConnectFailed("Connect to remote endpoint failed: " +
error.category().message(error.value()));
return;
}
+ if (remoteConnection)
+ {
+ NFD_LOG_DEBUG("[" << socket->local_endpoint() << "] "
+ "<< Connection from " << socket->remote_endpoint());
+ }
+ else
+ {
+ NFD_LOG_DEBUG("[" << socket->local_endpoint() << "] "
+ ">> Connection to " << socket->remote_endpoint());
+ }
+
+ /**
+ * \todo Remove FaceId from here
+ */
shared_ptr<TcpFace> face = make_shared<TcpFace>(boost::cref(socket));
onFaceCreated(face);
@@ -138,12 +167,16 @@
return;
socket->close();
+
+ NFD_LOG_DEBUG("Connect to remote endpoint failed: "
+ << error.category().message(error.value()));
+
onConnectFailed("Connect to remote endpoint failed: " +
error.category().message(error.value()));
return;
}
- handleConnection(error, socket, onFaceCreated, onConnectFailed);
+ handleConnection(error, socket, onFaceCreated, onConnectFailed, false);
}
void
@@ -156,6 +189,9 @@
return;
}
+ NFD_LOG_DEBUG("Connect to remote endpoint timed out: "
+ << error.category().message(error.value()));
+
onConnectFailed("Connect to remote endpoint timed out: " +
error.category().message(error.value()));
socket->close(); // abort the connection
@@ -167,7 +203,8 @@
const shared_ptr<boost::asio::ip::tcp::socket>& socket,
const shared_ptr<boost::asio::monotonic_deadline_timer>& timer,
const FaceCreatedCallback& onFaceCreated,
- const ConnectFailedCallback& onConnectFailed)
+ const ConnectFailedCallback& onConnectFailed,
+ const shared_ptr<ip::tcp::resolver>& resolver)
{
if (error ||
remoteEndpoint == ip::tcp::resolver::iterator())
@@ -177,6 +214,10 @@
socket->close();
timer->cancel();
+
+ NFD_LOG_DEBUG("Remote endpoint hostname or port cannot be resolved: "
+ << error.category().message(error.value()));
+
onConnectFailed("Remote endpoint hostname or port cannot be resolved: " +
error.category().message(error.value()));
return;
diff --git a/daemon/face/tcp-channel.hpp b/daemon/face/tcp-channel.hpp
index bd0388a..3e3d956 100644
--- a/daemon/face/tcp-channel.hpp
+++ b/daemon/face/tcp-channel.hpp
@@ -91,7 +91,8 @@
handleConnection(const boost::system::error_code& error,
const shared_ptr<boost::asio::ip::tcp::socket>& socket,
const FaceCreatedCallback& onFaceCreated,
- const ConnectFailedCallback& onConnectFailed);
+ const ConnectFailedCallback& onConnectFailed,
+ bool remoteConnection);
void
handleSuccessfulConnect(const boost::system::error_code& error,
@@ -112,7 +113,8 @@
const shared_ptr<boost::asio::ip::tcp::socket>& socket,
const shared_ptr<boost::asio::monotonic_deadline_timer>& timer,
const FaceCreatedCallback& onFaceCreated,
- const ConnectFailedCallback& onConnectFailed);
+ const ConnectFailedCallback& onConnectFailed,
+ const shared_ptr<boost::asio::ip::tcp::resolver>& resolver);
private:
boost::asio::io_service& m_ioService;
diff --git a/daemon/face/tcp-face.cpp b/daemon/face/tcp-face.cpp
index 4a295ff..a609725 100644
--- a/daemon/face/tcp-face.cpp
+++ b/daemon/face/tcp-face.cpp
@@ -8,6 +8,8 @@
namespace nfd {
+NFD_LOG_INCLASS_TEMPLATE_SPECIALIZATION_DEFINE(StreamFace, TcpFace::protocol, "TcpFace");
+
TcpFace::TcpFace(const shared_ptr<TcpFace::protocol::socket>& socket)
: StreamFace<protocol>(socket)
{