blob: a69dfe215725e2b5516cc4d48233ca5611448f1c [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyeve1e6f2a2014-04-25 11:28:12 -07003 * Copyright (c) 2014, Regents of the University of California.
4 *
5 * This file is part of NDN repo-ng (Next generation of NDN repository).
6 * See AUTHORS.md for complete list of repo-ng authors and contributors.
7 *
8 * repo-ng is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * repo-ng is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * repo-ng, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -070018 */
19
20#include "tcp-bulk-insert-handle.hpp"
21
22namespace repo {
23
24const size_t MAX_NDN_PACKET_SIZE = 8800;
25
26namespace detail {
27
28class TcpBulkInsertClient : noncopyable
29{
30public:
31 TcpBulkInsertClient(TcpBulkInsertHandle& writer,
32 const shared_ptr<boost::asio::ip::tcp::socket>& socket)
33 : m_writer(writer)
34 , m_socket(socket)
35 , m_hasStarted(false)
36 , m_inputBufferSize(0)
37 {
38 }
39
40 static void
41 startReceive(const shared_ptr<TcpBulkInsertClient>& client)
42 {
43 BOOST_ASSERT(!client->m_hasStarted);
44
45 client->m_socket->async_receive(
46 boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
47 bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
48
49 client->m_hasStarted = true;
50 }
51
52private:
53 void
54 handleReceive(const boost::system::error_code& error,
55 std::size_t nBytesReceived,
56 const shared_ptr<TcpBulkInsertClient>& client);
57
58private:
59 TcpBulkInsertHandle& m_writer;
60 shared_ptr<boost::asio::ip::tcp::socket> m_socket;
61 bool m_hasStarted;
62 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
63 std::size_t m_inputBufferSize;
64};
65
66} // namespace detail
67
68TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
69 StorageHandle& storageHandle)
70 : m_acceptor(ioService)
71 , m_storageHandle(storageHandle)
72{
73}
74
75void
76TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
77{
78 using namespace boost::asio;
79
80 ip::tcp::resolver resolver(m_acceptor.get_io_service());
81 ip::tcp::resolver::query query(host, port);
82
83 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
84 ip::tcp::resolver::iterator end;
85
86 if (endpoint == end)
87 throw Error("Cannot listen on [" + host + ":" + port + "]");
88
89 m_localEndpoint = *endpoint;
90 std::cerr << "Start listening on " << m_localEndpoint << std::endl;
91
92 m_acceptor.open(m_localEndpoint .protocol());
93 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
94 if (m_localEndpoint.address().is_v6())
95 {
96 m_acceptor.set_option(ip::v6_only(true));
97 }
98 m_acceptor.bind(m_localEndpoint);
99 m_acceptor.listen(255);
100
101 shared_ptr<ip::tcp::socket> clientSocket =
102 make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
103 m_acceptor.async_accept(*clientSocket,
104 bind(&TcpBulkInsertHandle::handleAccept, this, _1,
105 clientSocket));
106}
107
108void
109TcpBulkInsertHandle::stop()
110{
111 m_acceptor.cancel();
112 m_acceptor.close();
113}
114
115void
116TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
117 const shared_ptr<boost::asio::ip::tcp::socket>& socket)
118{
119 using namespace boost::asio;
120
121 if (error) {
122 // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
123 // return;
124 return;
125 }
126
127 std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
128
129 shared_ptr<detail::TcpBulkInsertClient> client =
130 make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
131 detail::TcpBulkInsertClient::startReceive(client);
132
133 // prepare accepting the next connection
134 shared_ptr<ip::tcp::socket> clientSocket =
135 make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
136 m_acceptor.async_accept(*clientSocket,
137 bind(&TcpBulkInsertHandle::handleAccept, this, _1,
138 clientSocket));
139}
140
141void
142detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
143 std::size_t nBytesReceived,
144 const shared_ptr<detail::TcpBulkInsertClient>& client)
145{
146 if (error)
147 {
148 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
149 return;
150
151 boost::system::error_code error;
152 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
153 m_socket->close(error);
154 return;
155 }
156
157 m_inputBufferSize += nBytesReceived;
158
159 // do magic
160
161 std::size_t offset = 0;
162
163 bool isOk = true;
164 Block element;
165 while (m_inputBufferSize - offset > 0)
166 {
167 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
168 if (!isOk)
169 break;
170
171 offset += element.size();
172 BOOST_ASSERT(offset <= m_inputBufferSize);
173
174 if (element.type() == ndn::Tlv::Data)
175 {
176 try {
177 Data data(element);
178 bool isOk = m_writer.getStorageHandle().insertData(data);
179 if (isOk)
180 std::cerr << "Successfully injected " << data.getName() << std::endl;
181 else
182 std::cerr << "FAILED to inject " << data.getName() << std::endl;
183 }
184 catch (std::runtime_error& error) {
185 /// \todo Catch specific error after determining what wireDecode() can throw
186 std::cerr << "Error decoding received Data packet" << std::endl;
187 }
188 }
189 }
190 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
191 {
192 boost::system::error_code error;
193 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
194 m_socket->close(error);
195 return;
196 }
197
198 if (offset > 0)
199 {
200 if (offset != m_inputBufferSize)
201 {
202 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
203 m_inputBuffer);
204 m_inputBufferSize -= offset;
205 }
206 else
207 {
208 m_inputBufferSize = 0;
209 }
210 }
211
212 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
213 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
214 bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
215}
216
217
218} // namespace repo