blob: 2713d032cb80586e28eaae55386bbf32be0c2ea1 [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesaventoe18d3682019-01-24 22:10:30 -05002/*
Davide Pesaventod8521aa2023-09-17 14:21:27 -04003 * Copyright (c) 2014-2023, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
Davide Pesavento3c843162019-07-07 15:06:44 -040022#include <boost/asio/ip/v6_only.hpp>
Davide Pesaventod8521aa2023-09-17 14:21:27 -040023
weijia yuan3aa8d2b2018-03-06 15:35:57 -080024#include <ndn-cxx/util/logger.hpp>
25
Davide Pesaventoe18d3682019-01-24 22:10:30 -050026NDN_LOG_INIT(repo.TcpHandle);
27
Davide Pesavento3c843162019-07-07 15:06:44 -040028namespace ip = boost::asio::ip;
29
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070030namespace repo {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070031namespace detail {
32
33class TcpBulkInsertClient : noncopyable
34{
35public:
Davide Pesavento3c843162019-07-07 15:06:44 -040036 TcpBulkInsertClient(TcpBulkInsertHandle& writer, std::shared_ptr<ip::tcp::socket> socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070037 : m_writer(writer)
Davide Pesavento3c843162019-07-07 15:06:44 -040038 , m_socket(std::move(socket))
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070039 {
40 }
41
42 static void
Davide Pesavento3c843162019-07-07 15:06:44 -040043 startReceive(TcpBulkInsertHandle& writer, std::shared_ptr<ip::tcp::socket> socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070044 {
Davide Pesavento3c843162019-07-07 15:06:44 -040045 auto client = std::make_shared<TcpBulkInsertClient>(writer, std::move(socket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070046 client->m_socket->async_receive(
Davide Pesaventoe18d3682019-01-24 22:10:30 -050047 boost::asio::buffer(client->m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080048 std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070049 }
50
51private:
52 void
53 handleReceive(const boost::system::error_code& error,
54 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080055 const std::shared_ptr<TcpBulkInsertClient>& client);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070056
57private:
58 TcpBulkInsertHandle& m_writer;
Davide Pesavento3c843162019-07-07 15:06:44 -040059 std::shared_ptr<ip::tcp::socket> m_socket;
Davide Pesaventoe18d3682019-01-24 22:10:30 -050060 uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
Davide Pesavento3c843162019-07-07 15:06:44 -040061 std::size_t m_inputBufferSize = 0;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070062};
63
64} // namespace detail
65
Davide Pesaventod8521aa2023-09-17 14:21:27 -040066TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_context& io,
Weiqi Shif0330d52014-07-09 10:54:27 -070067 RepoStorage& storageHandle)
Davide Pesaventod8521aa2023-09-17 14:21:27 -040068 : m_acceptor(io)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070069 , m_storageHandle(storageHandle)
70{
71}
72
73void
74TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
75{
Davide Pesaventod8521aa2023-09-17 14:21:27 -040076 ip::tcp::resolver resolver(m_acceptor.get_executor());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070077 ip::tcp::resolver::query query(host, port);
78
79 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
80 ip::tcp::resolver::iterator end;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070081 if (endpoint == end)
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -040082 NDN_THROW(Error("Cannot listen on " + host + " port " + port));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070083
84 m_localEndpoint = *endpoint;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080085 NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070086
Davide Pesavento3c843162019-07-07 15:06:44 -040087 m_acceptor.open(m_localEndpoint.protocol());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070088 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
weijia yuan3aa8d2b2018-03-06 15:35:57 -080089 if (m_localEndpoint.address().is_v6()) {
90 m_acceptor.set_option(ip::v6_only(true));
91 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070092 m_acceptor.bind(m_localEndpoint);
Davide Pesavento3c843162019-07-07 15:06:44 -040093 m_acceptor.listen();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070094
Davide Pesavento3c843162019-07-07 15:06:44 -040095 asyncAccept();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070096}
97
98void
99TcpBulkInsertHandle::stop()
100{
101 m_acceptor.cancel();
102 m_acceptor.close();
103}
104
105void
Davide Pesavento3c843162019-07-07 15:06:44 -0400106TcpBulkInsertHandle::asyncAccept()
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700107{
Davide Pesaventod8521aa2023-09-17 14:21:27 -0400108 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_executor());
Davide Pesavento3c843162019-07-07 15:06:44 -0400109 m_acceptor.async_accept(*clientSocket,
110 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
111}
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700112
Davide Pesavento3c843162019-07-07 15:06:44 -0400113void
114TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
115 const std::shared_ptr<ip::tcp::socket>& socket)
116{
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700117 if (error) {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700118 return;
119 }
120
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800121 NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700122
Davide Pesavento3c843162019-07-07 15:06:44 -0400123 detail::TcpBulkInsertClient::startReceive(*this, socket);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700124
125 // prepare accepting the next connection
Davide Pesavento3c843162019-07-07 15:06:44 -0400126 asyncAccept();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700127}
128
129void
130detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
131 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800132 const std::shared_ptr<detail::TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700133{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800134 if (error) {
135 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700136 return;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800137
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500138 boost::system::error_code ec;
Davide Pesavento3c843162019-07-07 15:06:44 -0400139 m_socket->shutdown(ip::tcp::socket::shutdown_both, ec);
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500140 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800141 return;
142 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700143
144 m_inputBufferSize += nBytesReceived;
145
146 // do magic
147
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400148 auto bufferView = ndn::make_span(m_inputBuffer, m_inputBufferSize);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700149 std::size_t offset = 0;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700150 bool isOk = true;
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400151 while (offset < bufferView.size()) {
152 Block element;
153 std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700154 if (!isOk)
155 break;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700156
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700157 offset += element.size();
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400158 BOOST_ASSERT(offset <= bufferView.size());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700159
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700160 if (element.type() == ndn::tlv::Data) {
161 try {
162 Data data(element);
163 bool isInserted = m_writer.getStorageHandle().insertData(data);
164 if (isInserted)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800165 NDN_LOG_DEBUG("Successfully injected " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700166 else
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800167 NDN_LOG_DEBUG("FAILED to inject " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700168 }
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400169 catch (const std::runtime_error& e) {
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700170 /// \todo Catch specific error after determining what wireDecode() can throw
Davide Pesavento9c0bd8d2022-03-14 16:48:12 -0400171 NDN_LOG_ERROR("Error decoding received Data packet: " << e.what());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700172 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700173 }
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700174 }
175
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500176 if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
177 boost::system::error_code ec;
Davide Pesavento3c843162019-07-07 15:06:44 -0400178 m_socket->shutdown(ip::tcp::socket::shutdown_both, ec);
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500179 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800180 return;
181 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700182
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800183 if (offset > 0) {
184 if (offset != m_inputBufferSize) {
185 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
186 m_inputBufferSize -= offset;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700187 }
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800188 else {
189 m_inputBufferSize = 0;
190 }
191 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700192
193 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500194 ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800195 std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700196}
197
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700198} // namespace repo