blob: f27d1735b66233f4459f13f8318adf654d0b53fe [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
weijia yuan3aa8d2b2018-03-06 15:35:57 -08003 * Copyright (c) 2014-2018, Regents of the University of California.
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07004 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
weijia yuan3aa8d2b2018-03-06 15:35:57 -080022#include <ndn-cxx/util/logger.hpp>
23
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070024namespace repo {
25
weijia yuan3aa8d2b2018-03-06 15:35:57 -080026NDN_LOG_INIT(repo.tcpHandle);
27
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070028const size_t MAX_NDN_PACKET_SIZE = 8800;
29
30namespace detail {
31
32class TcpBulkInsertClient : noncopyable
33{
34public:
35 TcpBulkInsertClient(TcpBulkInsertHandle& writer,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080036 const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070037 : m_writer(writer)
38 , m_socket(socket)
39 , m_hasStarted(false)
40 , m_inputBufferSize(0)
41 {
42 }
43
44 static void
weijia yuan3aa8d2b2018-03-06 15:35:57 -080045 startReceive(const std::shared_ptr<TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070046 {
47 BOOST_ASSERT(!client->m_hasStarted);
48
49 client->m_socket->async_receive(
50 boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080051 std::bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070052
53 client->m_hasStarted = true;
54 }
55
56private:
57 void
58 handleReceive(const boost::system::error_code& error,
59 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -080060 const std::shared_ptr<TcpBulkInsertClient>& client);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070061
62private:
63 TcpBulkInsertHandle& m_writer;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080064 std::shared_ptr<boost::asio::ip::tcp::socket> m_socket;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070065 bool m_hasStarted;
66 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
67 std::size_t m_inputBufferSize;
68};
69
70} // namespace detail
71
72TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
Weiqi Shif0330d52014-07-09 10:54:27 -070073 RepoStorage& storageHandle)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070074 : m_acceptor(ioService)
75 , m_storageHandle(storageHandle)
76{
77}
78
79void
80TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
81{
82 using namespace boost::asio;
83
84 ip::tcp::resolver resolver(m_acceptor.get_io_service());
85 ip::tcp::resolver::query query(host, port);
86
87 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
88 ip::tcp::resolver::iterator end;
89
90 if (endpoint == end)
Alexander Afanasyev42290b22017-03-09 12:58:29 -080091 BOOST_THROW_EXCEPTION(Error("Cannot listen on [" + host + ":" + port + "]"));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070092
93 m_localEndpoint = *endpoint;
weijia yuan3aa8d2b2018-03-06 15:35:57 -080094 NDN_LOG_DEBUG("Start listening on " << m_localEndpoint);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070095
96 m_acceptor.open(m_localEndpoint .protocol());
97 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
weijia yuan3aa8d2b2018-03-06 15:35:57 -080098 if (m_localEndpoint.address().is_v6()) {
99 m_acceptor.set_option(ip::v6_only(true));
100 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700101 m_acceptor.bind(m_localEndpoint);
102 m_acceptor.listen(255);
103
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800104 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700105 m_acceptor.async_accept(*clientSocket,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800106 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700107}
108
109void
110TcpBulkInsertHandle::stop()
111{
112 m_acceptor.cancel();
113 m_acceptor.close();
114}
115
116void
117TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800118 const std::shared_ptr<boost::asio::ip::tcp::socket>& socket)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700119{
120 using namespace boost::asio;
121
122 if (error) {
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700123 return;
124 }
125
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800126 NDN_LOG_DEBUG("New connection from " << socket->remote_endpoint());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700127
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800128 std::shared_ptr<detail::TcpBulkInsertClient> client =
129 std::make_shared<detail::TcpBulkInsertClient>(*this, socket);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700130 detail::TcpBulkInsertClient::startReceive(client);
131
132 // prepare accepting the next connection
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800133 auto clientSocket = std::make_shared<ip::tcp::socket>(m_acceptor.get_io_service());
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700134 m_acceptor.async_accept(*clientSocket,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800135 std::bind(&TcpBulkInsertHandle::handleAccept, this, _1, clientSocket));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700136}
137
138void
139detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
140 std::size_t nBytesReceived,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800141 const std::shared_ptr<detail::TcpBulkInsertClient>& client)
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700142{
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800143 if (error) {
144 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700145 return;
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800146
147 boost::system::error_code error;
148 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
149 m_socket->close(error);
150 return;
151 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700152
153 m_inputBufferSize += nBytesReceived;
154
155 // do magic
156
157 std::size_t offset = 0;
158
159 bool isOk = true;
160 Block element;
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700161 while (m_inputBufferSize - offset > 0) {
162 std::tie(isOk, element) = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset);
163 if (!isOk)
164 break;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700165
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700166 offset += element.size();
167 BOOST_ASSERT(offset <= m_inputBufferSize);
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700168
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700169 if (element.type() == ndn::tlv::Data) {
170 try {
171 Data data(element);
172 bool isInserted = m_writer.getStorageHandle().insertData(data);
173 if (isInserted)
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800174 NDN_LOG_DEBUG("Successfully injected " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700175 else
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800176 NDN_LOG_DEBUG("FAILED to inject " << data.getName());
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700177 }
Alexander Afanasyev42290b22017-03-09 12:58:29 -0800178 catch (const std::runtime_error& error) {
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700179 /// \todo Catch specific error after determining what wireDecode() can throw
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800180 NDN_LOG_ERROR("Error decoding received Data packet");
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700181 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700182 }
Junxiao Shi1e39ddd2015-02-28 23:01:27 -0700183 }
184
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800185 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
186 boost::system::error_code error;
187 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
188 m_socket->close(error);
189 return;
190 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700191
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800192 if (offset > 0) {
193 if (offset != m_inputBufferSize) {
194 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
195 m_inputBufferSize -= offset;
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700196 }
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800197 else {
198 m_inputBufferSize = 0;
199 }
200 }
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700201
202 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
203 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
weijia yuan3aa8d2b2018-03-06 15:35:57 -0800204 std::bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -0700205}
206
207
208} // namespace repo