blob: cd263348aeac8eec9b97597a2ddccaf3b06f2871 [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesaventoe18d3682019-01-24 22:10:30 -05002/*
Davide Pesaventod8521aa2023-09-17 14:21:27 -04003 * Copyright (c) 2014-2023, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
Davide Pesavento3c843162019-07-07 15:06:44 -040022#include <boost/asio/ip/v6_only.hpp>
Davide Pesaventod8521aa2023-09-17 14:21:27 -040023
weijia yuan3aa8d2b2018-03-06 15:35:57 -080024#include <ndn-cxx/util/logger.hpp>
25
Davide Pesaventoe18d3682019-01-24 22:10:30 -050026NDN_LOG_INIT(repo.TcpHandle);
27
Davide Pesavento3c843162019-07-07 15:06:44 -040028namespace ip = boost::asio::ip;
29
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070030namespace repo {
Davide Pesavento164ae3b2023-11-11 22:00:23 -050031namespace {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070032
33class TcpBulkInsertClient : noncopyable
34{
35public:
Davide Pesavento164ae3b2023-11-11 22:00:23 -050036 TcpBulkInsertClient(TcpBulkInsertHandle& writer, ip::tcp::socket socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070037 : m_writer(writer)
Davide Pesavento3c843162019-07-07 15:06:44 -040038 , m_socket(std::move(socket))
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070039 {
40 }
41
42 static void
Davide Pesavento164ae3b2023-11-11 22:00:23 -050043 startReceive(TcpBulkInsertHandle& writer, ip::tcp::socket socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070044 {
Davide Pesavento3c843162019-07-07 15:06:44 -040045 auto client = std::make_shared<TcpBulkInsertClient>(writer, std::move(socket));
Davide Pesavento164ae3b2023-11-11 22:00:23 -050046 client->m_socket.async_receive(
Davide Pesaventoe18d3682019-01-24 22:10:30 -050047 boost::asio::buffer(client->m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080048 std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070049 }
50
51private:
52 void
Davide Pesavento164ae3b2023-11-11 22:00:23 -050053 handleReceive(const boost::system::error_code& error, std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080054 const std::shared_ptr<TcpBulkInsertClient>& client);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070055
56private:
57 TcpBulkInsertHandle& m_writer;
Davide Pesavento164ae3b2023-11-11 22:00:23 -050058 ip::tcp::socket m_socket;
Davide Pesaventoe18d3682019-01-24 22:10:30 -050059 uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
Davide Pesavento3c843162019-07-07 15:06:44 -040060 std::size_t m_inputBufferSize = 0;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070061};
62
Davide Pesavento164ae3b2023-11-11 22:00:23 -050063} // namespace
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070064
Davide Pesaventod8521aa2023-09-17 14:21:27 -040065TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_context& io,
Weiqi Shif0330d52014-07-09 10:54:27 -070066 RepoStorage& storageHandle)
Davide Pesaventod8521aa2023-09-17 14:21:27 -040067 : m_acceptor(io)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070068 , m_storageHandle(storageHandle)
69{
70}
71
72void
73TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
74{
Davide Pesaventod8521aa2023-09-17 14:21:27 -040075 ip::tcp::resolver resolver(m_acceptor.get_executor());
Davide Pesavento164ae3b2023-11-11 22:00:23 -050076 boost::system::error_code ec;
77 auto results = resolver.resolve(host, port, ec);
78 if (ec)
79 NDN_THROW(Error("Cannot resolve " + host + ":" + port + " (" + ec.message() + ")"));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070080
Davide Pesavento164ae3b2023-11-11 22:00:23 -050081 m_localEndpoint = *results.begin();
weijia yuan3aa8d2b2018-03-06 15:35:57 -080082 NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070083
Davide Pesavento3c843162019-07-07 15:06:44 -040084 m_acceptor.open(m_localEndpoint.protocol());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070085 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
weijia yuan3aa8d2b2018-03-06 15:35:57 -080086 if (m_localEndpoint.address().is_v6()) {
87 m_acceptor.set_option(ip::v6_only(true));
88 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070089 m_acceptor.bind(m_localEndpoint);
Davide Pesavento3c843162019-07-07 15:06:44 -040090 m_acceptor.listen();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070091
Davide Pesavento3c843162019-07-07 15:06:44 -040092 asyncAccept();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070093}
94
95void
96TcpBulkInsertHandle::stop()
97{
98 m_acceptor.cancel();
99 m_acceptor.close();
100}
101
102void
Davide Pesavento3c843162019-07-07 15:06:44 -0400103TcpBulkInsertHandle::asyncAccept()
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700104{
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500105 m_acceptor.async_accept([this] (const auto& error, ip::tcp::socket socket) {
106 if (error) {
107 return;
108 }
109
110 NDN_LOG_DEBUG("New connection from " << socket.remote_endpoint());
111 TcpBulkInsertClient::startReceive(*this, std::move(socket));
112
113 // prepare accepting the next connection
114 asyncAccept();
115 });
Davide Pesavento3c843162019-07-07 15:06:44 -0400116}
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700117
Davide Pesavento3c843162019-07-07 15:06:44 -0400118void
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500119TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
120 std::size_t nBytesReceived,
121 const std::shared_ptr<TcpBulkInsertClient>& client)
Davide Pesavento3c843162019-07-07 15:06:44 -0400122{
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700123 if (error) {
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500124 if (error == boost::asio::error::operation_aborted) // when socket is closed by someone
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700125 return;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800126
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500127 boost::system::error_code ec;
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500128 m_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
129 m_socket.close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800130 return;
131 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700132
133 m_inputBufferSize += nBytesReceived;
134
135 // do magic
136
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400137 auto bufferView = ndn::make_span(m_inputBuffer, m_inputBufferSize);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700138 std::size_t offset = 0;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700139 bool isOk = true;
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400140 while (offset < bufferView.size()) {
141 Block element;
142 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700143 if (!isOk)
144 break;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700145
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700146 offset += element.size();
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400147 BOOST_ASSERT(offset <= bufferView.size());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700148
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700149 if (element.type() == ndn::tlv::Data) {
150 try {
151 Data data(element);
152 bool isInserted = m_writer.getStorageHandle().insertData(data);
153 if (isInserted)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800154 NDN_LOG_DEBUG("Successfully injected " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700155 else
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800156 NDN_LOG_DEBUG("FAILED to inject " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700157 }
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400158 catch (const std::runtime_error& e) {
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700159 /// \todo Catch specific error after determining what wireDecode() can throw
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400160 NDN_LOG_ERROR("Error decoding received Data packet: " << e.what());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700161 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700162 }
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700163 }
164
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500165 if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
166 boost::system::error_code ec;
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500167 m_socket.shutdown(ip::tcp::socket::shutdown_both, ec);
168 m_socket.close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800169 return;
170 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700171
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800172 if (offset > 0) {
173 if (offset != m_inputBufferSize) {
174 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
175 m_inputBufferSize -= offset;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700176 }
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800177 else {
178 m_inputBufferSize = 0;
179 }
180 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700181
Davide Pesavento164ae3b2023-11-11 22:00:23 -0500182 m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
183 ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
184 std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700185}
186
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700187} // namespace repo