Remove dependency on Selectors and refactor codebase.
Change-Id: Ic3024b76ba0eea61f790c91c36090b4aa68702a3
Refs: #4522
diff --git a/src/handles/tcp-bulk-insert-handle.cpp b/src/handles/tcp-bulk-insert-handle.cpp
index 53a60d6..f27d173 100644
--- a/src/handles/tcp-bulk-insert-handle.cpp
+++ b/src/handles/tcp-bulk-insert-handle.cpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2014-2017, Regents of the University of California.
+ * Copyright (c) 2014-2018, Regents of the University of California.
*
* This file is part of NDN repo-ng (Next generation of NDN repository).
* See AUTHORS.md for complete list of repo-ng authors and contributors.
@@ -19,8 +19,12 @@
#include "tcp-bulk-insert-handle.hpp"
+#include <ndn-cxx/util/logger.hpp>
+
namespace repo {
+NDN_LOG_INIT(repo.tcpHandle);
+
const size_t MAX_NDN_PACKET_SIZE = 8800;
namespace detail {
@@ -29,7 +33,7 @@
{
public:
TcpBulkInsertClient(TcpBulkInsertHandle& writer,
- const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
: m_writer(writer)
, m_socket(socket)
, m_hasStarted(false)
@@ -38,13 +42,13 @@
}
static void
- startReceive(const shared_ptr<TcpBulkInsertClient>& client)
+ startReceive(const std::shared_ptr<TcpBulkInsertClient>& client)
{
BOOST_ASSERT(!client->m_hasStarted);
client->m_socket->async_receive(
boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
- bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
+ std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
client->m_hasStarted = true;
}
@@ -53,11 +57,11 @@
void
handleReceive(const boost::system::error_code& error,
std::size_t nBytesReceived,
- const shared_ptr<TcpBulkInsertClient>& client);
+ const std::shared_ptr<TcpBulkInsertClient>& client);
private:
TcpBulkInsertHandle& m_writer;
- shared_ptr<boost::asio::ip::tcp::socket> m_socket;
+ std::shared_ptr<boost::asio::ip::tcp::socket> m_socket;
bool m_hasStarted;
uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
std::size_t m_inputBufferSize;
@@ -87,22 +91,19 @@
BOOST_THROW_EXCEPTION(Error("Cannot listen on [" + host + ":" + port + "]"));
m_localEndpoint = *endpoint;
- std::cerr << "Start listening on " << m_localEndpoint << std::endl;
+ NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
m_acceptor.open(m_localEndpoint .protocol());
m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
- if (m_localEndpoint.address().is_v6())
- {
- m_acceptor.set_option(ip::v6_only(true));
- }
+ if (m_localEndpoint.address().is_v6()) {
+ m_acceptor.set_option(ip::v6_only(true));
+ }
m_acceptor.bind(m_localEndpoint);
m_acceptor.listen(255);
- shared_ptr<ip::tcp::socket> clientSocket =
- make_shared<ip::tcp::socket>(std::ref(m_acceptor.get_io_service()));
+ auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
m_acceptor.async_accept(*clientSocket,
- bind(&TcpBulkInsertHandle::handleAccept, this, _1,
- clientSocket));
+ std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
}
void
@@ -114,45 +115,40 @@
void
TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
- const shared_ptr<boost::asio::ip::tcp::socket>& socket)
+ const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
{
using namespace boost::asio;
if (error) {
- // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- // return;
return;
}
- std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
+ NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
- shared_ptr<detail::TcpBulkInsertClient> client =
- make_shared<detail::TcpBulkInsertClient>(std::ref(*this), socket);
+ std::shared_ptr<detail::TcpBulkInsertClient> client =
+ std::make_shared<detail::TcpBulkInsertClient>(*this, socket);
detail::TcpBulkInsertClient::startReceive(client);
// prepare accepting the next connection
- shared_ptr<ip::tcp::socket> clientSocket =
- make_shared<ip::tcp::socket>(std::ref(m_acceptor.get_io_service()));
+ auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
m_acceptor.async_accept(*clientSocket,
- bind(&TcpBulkInsertHandle::handleAccept, this, _1,
- clientSocket));
+ std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
}
void
detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
std::size_t nBytesReceived,
- const shared_ptr<detail::TcpBulkInsertClient>& client)
+ const std::shared_ptr<detail::TcpBulkInsertClient>& client)
{
- if (error)
- {
- if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
- return;
-
- boost::system::error_code error;
- m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
- m_socket->close(error);
+ if (error) {
+ if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
return;
- }
+
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
m_inputBufferSize += nBytesReceived;
@@ -175,42 +171,37 @@
Data data(element);
bool isInserted = m_writer.getStorageHandle().insertData(data);
if (isInserted)
- std::cerr << "Successfully injected " << data.getName() << std::endl;
+ NDN_LOG_DEBUG("Successfully injected " << data.getName());
else
- std::cerr << "FAILED to inject " << data.getName() << std::endl;
+ NDN_LOG_DEBUG("FAILED to inject " << data.getName());
}
catch (const std::runtime_error& error) {
/// \todo Catch specific error after determining what wireDecode() can throw
- std::cerr << "Error decoding received Data packet" << std::endl;
+ NDN_LOG_ERROR("Error decoding received Data packet");
}
}
}
- if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
- {
- boost::system::error_code error;
- m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
- m_socket->close(error);
- return;
- }
+ if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
+ boost::system::error_code error;
+ m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
+ m_socket->close(error);
+ return;
+ }
- if (offset > 0)
- {
- if (offset != m_inputBufferSize)
- {
- std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
- m_inputBuffer);
- m_inputBufferSize -= offset;
- }
- else
- {
- m_inputBufferSize = 0;
- }
+ if (offset > 0) {
+ if (offset != m_inputBufferSize) {
+ std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
+ m_inputBufferSize -= offset;
}
+ else {
+ m_inputBufferSize = 0;
+ }
+ }
m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
- bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
+ std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
}