blob: 9c3f68091e504276237e2f482a1816df9c6b0f15 [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesaventoe18d3682019-01-24 22:10:30 -05002/*
3 * Copyright (c) 2014-2019, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
Davide Pesavento3c843162019-07-07 15:06:44 -040022#include <boost/asio/ip/v6_only.hpp>
weijia yuan3aa8d2b2018-03-06 15:35:57 -080023#include <ndn-cxx/util/logger.hpp>
24
Davide Pesaventoe18d3682019-01-24 22:10:30 -050025NDN_LOG_INIT(repo.TcpHandle);
26
Davide Pesavento3c843162019-07-07 15:06:44 -040027namespace ip = boost::asio::ip;
28
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070029namespace repo {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070030namespace detail {
31
32class TcpBulkInsertClient : noncopyable
33{
34public:
Davide Pesavento3c843162019-07-07 15:06:44 -040035 TcpBulkInsertClient(TcpBulkInsertHandle& writer, std::shared_ptr<ip::tcp::socket> socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070036 : m_writer(writer)
Davide Pesavento3c843162019-07-07 15:06:44 -040037 , m_socket(std::move(socket))
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070038 {
39 }
40
41 static void
Davide Pesavento3c843162019-07-07 15:06:44 -040042 startReceive(TcpBulkInsertHandle& writer, std::shared_ptr<ip::tcp::socket> socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070043 {
Davide Pesavento3c843162019-07-07 15:06:44 -040044 auto client = std::make_shared<TcpBulkInsertClient>(writer, std::move(socket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070045 client->m_socket->async_receive(
Davide Pesaventoe18d3682019-01-24 22:10:30 -050046 boost::asio::buffer(client->m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080047 std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070048 }
49
50private:
51 void
52 handleReceive(const boost::system::error_code& error,
53 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080054 const std::shared_ptr<TcpBulkInsertClient>& client);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070055
56private:
57 TcpBulkInsertHandle& m_writer;
Davide Pesavento3c843162019-07-07 15:06:44 -040058 std::shared_ptr<ip::tcp::socket> m_socket;
Davide Pesaventoe18d3682019-01-24 22:10:30 -050059 uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
Davide Pesavento3c843162019-07-07 15:06:44 -040060 std::size_t m_inputBufferSize = 0;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070061};
62
63} // namespace detail
64
65TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
Weiqi Shif0330d52014-07-09 10:54:27 -070066 RepoStorage& storageHandle)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070067 : m_acceptor(ioService)
68 , m_storageHandle(storageHandle)
69{
70}
71
72void
73TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
74{
Davide Pesavento3c843162019-07-07 15:06:44 -040075 ip::tcp::resolver resolver(m_acceptor
76#if BOOST_VERSION >= 107000
77 .get_executor()
78#else
79 .get_io_service()
80#endif
81 );
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070082 ip::tcp::resolver::query query(host, port);
83
84 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
85 ip::tcp::resolver::iterator end;
86
87 if (endpoint == end)
Davide Pesavento3c843162019-07-07 15:06:44 -040088 BOOST_THROW_EXCEPTION(Error("Cannot listen on " + host + " port " + port));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070089
90 m_localEndpoint = *endpoint;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080091 NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070092
Davide Pesavento3c843162019-07-07 15:06:44 -040093 m_acceptor.open(m_localEndpoint.protocol());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070094 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
weijia yuan3aa8d2b2018-03-06 15:35:57 -080095 if (m_localEndpoint.address().is_v6()) {
96 m_acceptor.set_option(ip::v6_only(true));
97 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070098 m_acceptor.bind(m_localEndpoint);
Davide Pesavento3c843162019-07-07 15:06:44 -040099 m_acceptor.listen();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700100
Davide Pesavento3c843162019-07-07 15:06:44 -0400101 asyncAccept();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700102}
103
104void
105TcpBulkInsertHandle::stop()
106{
107 m_acceptor.cancel();
108 m_acceptor.close();
109}
110
111void
Davide Pesavento3c843162019-07-07 15:06:44 -0400112TcpBulkInsertHandle::asyncAccept()
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700113{
Davide Pesavento3c843162019-07-07 15:06:44 -0400114 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor
115#if BOOST_VERSION >= 107000
116 .get_executor()
117#else
118 .get_io_service()
119#endif
120 );
121 m_acceptor.async_accept(*clientSocket,
122 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
123}
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700124
Davide Pesavento3c843162019-07-07 15:06:44 -0400125void
126TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
127 const std::shared_ptr<ip::tcp::socket>& socket)
128{
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700129 if (error) {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700130 return;
131 }
132
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800133 NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700134
Davide Pesavento3c843162019-07-07 15:06:44 -0400135 detail::TcpBulkInsertClient::startReceive(*this, socket);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700136
137 // prepare accepting the next connection
Davide Pesavento3c843162019-07-07 15:06:44 -0400138 asyncAccept();
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700139}
140
141void
142detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
143 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800144 const std::shared_ptr<detail::TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700145{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800146 if (error) {
147 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700148 return;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800149
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500150 boost::system::error_code ec;
Davide Pesavento3c843162019-07-07 15:06:44 -0400151 m_socket->shutdown(ip::tcp::socket::shutdown_both, ec);
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500152 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800153 return;
154 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700155
156 m_inputBufferSize += nBytesReceived;
157
158 // do magic
159
160 std::size_t offset = 0;
161
162 bool isOk = true;
163 Block element;
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700164 while (m_inputBufferSize - offset > 0) {
165 std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
166 if (!isOk)
167 break;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700168
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700169 offset += element.size();
170 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700171
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700172 if (element.type() == ndn::tlv::Data) {
173 try {
174 Data data(element);
175 bool isInserted = m_writer.getStorageHandle().insertData(data);
176 if (isInserted)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800177 NDN_LOG_DEBUG("Successfully injected " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700178 else
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800179 NDN_LOG_DEBUG("FAILED to inject " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700180 }
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500181 catch (const std::runtime_error&) {
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700182 /// \todo Catch specific error after determining what wireDecode() can throw
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800183 NDN_LOG_ERROR("Error decoding received Data packet");
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700184 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700185 }
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700186 }
187
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500188 if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
189 boost::system::error_code ec;
Davide Pesavento3c843162019-07-07 15:06:44 -0400190 m_socket->shutdown(ip::tcp::socket::shutdown_both, ec);
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500191 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800192 return;
193 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700194
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800195 if (offset > 0) {
196 if (offset != m_inputBufferSize) {
197 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
198 m_inputBufferSize -= offset;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700199 }
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800200 else {
201 m_inputBufferSize = 0;
202 }
203 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700204
205 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500206 ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800207 std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700208}
209
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700210} // namespace repo