blob: 5375169b72f5cda07441a5e43ae372700821ef5e [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#include "tcp-bulk-insert-handle.hpp"
8
9namespace repo {
10
11const size_t MAX_NDN_PACKET_SIZE = 8800;
12
13namespace detail {
14
15class TcpBulkInsertClient : noncopyable
16{
17public:
18 TcpBulkInsertClient(TcpBulkInsertHandle& writer,
19 const shared_ptr<boost::asio::ip::tcp::socket>& socket)
20 : m_writer(writer)
21 , m_socket(socket)
22 , m_hasStarted(false)
23 , m_inputBufferSize(0)
24 {
25 }
26
27 static void
28 startReceive(const shared_ptr<TcpBulkInsertClient>& client)
29 {
30 BOOST_ASSERT(!client->m_hasStarted);
31
32 client->m_socket->async_receive(
33 boost::asio::buffer(client->m_inputBuffer, MAX_NDN_PACKET_SIZE), 0,
34 bind(&TcpBulkInsertClient::handleReceive, client, _1, _2, client));
35
36 client->m_hasStarted = true;
37 }
38
39private:
40 void
41 handleReceive(const boost::system::error_code& error,
42 std::size_t nBytesReceived,
43 const shared_ptr<TcpBulkInsertClient>& client);
44
45private:
46 TcpBulkInsertHandle& m_writer;
47 shared_ptr<boost::asio::ip::tcp::socket> m_socket;
48 bool m_hasStarted;
49 uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
50 std::size_t m_inputBufferSize;
51};
52
53} // namespace detail
54
55TcpBulkInsertHandle::TcpBulkInsertHandle(boost::asio::io_service& ioService,
56 StorageHandle& storageHandle)
57 : m_acceptor(ioService)
58 , m_storageHandle(storageHandle)
59{
60}
61
62void
63TcpBulkInsertHandle::listen(const std::string& host, const std::string& port)
64{
65 using namespace boost::asio;
66
67 ip::tcp::resolver resolver(m_acceptor.get_io_service());
68 ip::tcp::resolver::query query(host, port);
69
70 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
71 ip::tcp::resolver::iterator end;
72
73 if (endpoint == end)
74 throw Error("Cannot listen on [" + host + ":" + port + "]");
75
76 m_localEndpoint = *endpoint;
77 std::cerr << "Start listening on " << m_localEndpoint << std::endl;
78
79 m_acceptor.open(m_localEndpoint .protocol());
80 m_acceptor.set_option(ip::tcp::acceptor::reuse_address(true));
81 if (m_localEndpoint.address().is_v6())
82 {
83 m_acceptor.set_option(ip::v6_only(true));
84 }
85 m_acceptor.bind(m_localEndpoint);
86 m_acceptor.listen(255);
87
88 shared_ptr<ip::tcp::socket> clientSocket =
89 make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
90 m_acceptor.async_accept(*clientSocket,
91 bind(&TcpBulkInsertHandle::handleAccept, this, _1,
92 clientSocket));
93}
94
95void
96TcpBulkInsertHandle::stop()
97{
98 m_acceptor.cancel();
99 m_acceptor.close();
100}
101
102void
103TcpBulkInsertHandle::handleAccept(const boost::system::error_code& error,
104 const shared_ptr<boost::asio::ip::tcp::socket>& socket)
105{
106 using namespace boost::asio;
107
108 if (error) {
109 // if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
110 // return;
111 return;
112 }
113
114 std::cerr << "New connection from " << socket->remote_endpoint() << std::endl;
115
116 shared_ptr<detail::TcpBulkInsertClient> client =
117 make_shared<detail::TcpBulkInsertClient>(boost::ref(*this), socket);
118 detail::TcpBulkInsertClient::startReceive(client);
119
120 // prepare accepting the next connection
121 shared_ptr<ip::tcp::socket> clientSocket =
122 make_shared<ip::tcp::socket>(boost::ref(m_acceptor.get_io_service()));
123 m_acceptor.async_accept(*clientSocket,
124 bind(&TcpBulkInsertHandle::handleAccept, this, _1,
125 clientSocket));
126}
127
128void
129detail::TcpBulkInsertClient::handleReceive(const boost::system::error_code& error,
130 std::size_t nBytesReceived,
131 const shared_ptr<detail::TcpBulkInsertClient>& client)
132{
133 if (error)
134 {
135 if (error == boost::system::errc::operation_canceled) // when socket is closed by someone
136 return;
137
138 boost::system::error_code error;
139 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
140 m_socket->close(error);
141 return;
142 }
143
144 m_inputBufferSize += nBytesReceived;
145
146 // do magic
147
148 std::size_t offset = 0;
149
150 bool isOk = true;
151 Block element;
152 while (m_inputBufferSize - offset > 0)
153 {
154 isOk = Block::fromBuffer(m_inputBuffer + offset, m_inputBufferSize - offset, element);
155 if (!isOk)
156 break;
157
158 offset += element.size();
159 BOOST_ASSERT(offset <= m_inputBufferSize);
160
161 if (element.type() == ndn::Tlv::Data)
162 {
163 try {
164 Data data(element);
165 bool isOk = m_writer.getStorageHandle().insertData(data);
166 if (isOk)
167 std::cerr << "Successfully injected " << data.getName() << std::endl;
168 else
169 std::cerr << "FAILED to inject " << data.getName() << std::endl;
170 }
171 catch (std::runtime_error& error) {
172 /// \todo Catch specific error after determining what wireDecode() can throw
173 std::cerr << "Error decoding received Data packet" << std::endl;
174 }
175 }
176 }
177 if (!isOk && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0)
178 {
179 boost::system::error_code error;
180 m_socket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, error);
181 m_socket->close(error);
182 return;
183 }
184
185 if (offset > 0)
186 {
187 if (offset != m_inputBufferSize)
188 {
189 std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize,
190 m_inputBuffer);
191 m_inputBufferSize -= offset;
192 }
193 else
194 {
195 m_inputBufferSize = 0;
196 }
197 }
198
199 m_socket->async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
200 MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
201 bind(&TcpBulkInsertClient::handleReceive, this, _1, _2, client));
202}
203
204
205} // namespace repo