blob: 598e9663205b93b525544c9798141595b857d664 [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Davide Pesaventoe18d3682019-01-24 22:10:30 -05002/*
3 * Copyright (c) 2014-2019, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
weijia yuan3aa8d2b2018-03-06 15:35:57 -080022#include <ndn-cxx/util/logger.hpp>
23
Davide Pesaventoe18d3682019-01-24 22:10:30 -050024NDN_LOG_INIT(repo.TcpHandle);
25
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070026namespace repo {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070027namespace detail {
28
29class TcpBulkInsertClient : noncopyable
30{
31public:
32 TcpBulkInsertClient(TcpBulkInsertHandle& writer,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080033 const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070034 : m_writer(writer)
35 , m_socket(socket)
36 , m_hasStarted(false)
37 , m_inputBufferSize(0)
38 {
39 }
40
41 static void
weijia yuan3aa8d2b2018-03-06 15:35:57 -080042 startReceive(const std::shared_ptr<TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070043 {
44 BOOST_ASSERT(!client->m_hasStarted);
45
46 client->m_socket->async_receive(
Davide Pesaventoe18d3682019-01-24 22:10:30 -050047 boost::asio::buffer(client->m_inputBuffer, ndn::MAX_NDN_PACKET_SIZE), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080048 std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070049
50 client->m_hasStarted = true;
51 }
52
53private:
54 void
55 handleReceive(const boost::system::error_code& error,
56 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080057 const std::shared_ptr<TcpBulkInsertClient>& client);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070058
59private:
60 TcpBulkInsertHandle& m_writer;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080061 std::shared_ptr<boost::asio::ip::tcp::socket> m_socket;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070062 bool m_hasStarted;
Davide Pesaventoe18d3682019-01-24 22:10:30 -050063 uint8_t m_inputBuffer[ndn::MAX_NDN_PACKET_SIZE];
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070064 std::size_t m_inputBufferSize;
65};
66
67} // namespace detail
68
69TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
Weiqi Shif0330d52014-07-09 10:54:27 -070070 RepoStorage& storageHandle)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070071 : m_acceptor(ioService)
72 , m_storageHandle(storageHandle)
73{
74}
75
76void
77TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
78{
79 using namespace boost::asio;
80
81 ip::tcp::resolver resolver(m_acceptor.get_io_service());
82 ip::tcp::resolver::query query(host, port);
83
84 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
85 ip::tcp::resolver::iterator end;
86
87 if (endpoint == end)
Alexander Afanasyev42290b22017-03-09 12:58:29 -080088 BOOST_THROW_EXCEPTION(Error("Cannot listen on [" + host + ":" + port + "]"));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070089
90 m_localEndpoint = *endpoint;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080091 NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070092
93 m_acceptor.open(m_localEndpoint .protocol());
94 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
weijia yuan3aa8d2b2018-03-06 15:35:57 -080095 if (m_localEndpoint.address().is_v6()) {
96 m_acceptor.set_option(ip::v6_only(true));
97 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070098 m_acceptor.bind(m_localEndpoint);
99 m_acceptor.listen(255);
100
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800101 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700102 m_acceptor.async_accept(*clientSocket,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800103 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700104}
105
106void
107TcpBulkInsertHandle::stop()
108{
109 m_acceptor.cancel();
110 m_acceptor.close();
111}
112
113void
114TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800115 const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700116{
117 using namespace boost::asio;
118
119 if (error) {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700120 return;
121 }
122
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800123 NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700124
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800125 std::shared_ptr<detail::TcpBulkInsertClient> client =
126 std::make_shared<detail::TcpBulkInsertClient>(*this, socket);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700127 detail::TcpBulkInsertClient::startReceive(client);
128
129 // prepare accepting the next connection
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800130 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700131 m_acceptor.async_accept(*clientSocket,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800132 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700133}
134
135void
136detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
137 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800138 const std::shared_ptr<detail::TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700139{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800140 if (error) {
141 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700142 return;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800143
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500144 boost::system::error_code ec;
145 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
146 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800147 return;
148 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700149
150 m_inputBufferSize += nBytesReceived;
151
152 // do magic
153
154 std::size_t offset = 0;
155
156 bool isOk = true;
157 Block element;
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700158 while (m_inputBufferSize - offset > 0) {
159 std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
160 if (!isOk)
161 break;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700162
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700163 offset += element.size();
164 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700165
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700166 if (element.type() == ndn::tlv::Data) {
167 try {
168 Data data(element);
169 bool isInserted = m_writer.getStorageHandle().insertData(data);
170 if (isInserted)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800171 NDN_LOG_DEBUG("Successfully injected " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700172 else
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800173 NDN_LOG_DEBUG("FAILED to inject " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700174 }
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500175 catch (const std::runtime_error&) {
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700176 /// \todo Catch specific error after determining what wireDecode() can throw
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800177 NDN_LOG_ERROR("Error decoding received Data packet");
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700178 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700179 }
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700180 }
181
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500182 if (!isOk && m_inputBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
183 boost::system::error_code ec;
184 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
185 m_socket->close(ec);
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800186 return;
187 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700188
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800189 if (offset > 0) {
190 if (offset != m_inputBufferSize) {
191 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
192 m_inputBufferSize -= offset;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700193 }
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800194 else {
195 m_inputBufferSize = 0;
196 }
197 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700198
199 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
Davide Pesaventoe18d3682019-01-24 22:10:30 -0500200 ndn::MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800201 std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700202}
203
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700204} // namespace repo