blob: ffe002bbd1795dcded80dd81d1a119e6f447f074 [file] [log] [blame]
Alexander Afanasyevb0c78ea2014-04-15 18:12:04 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (C) 2014 Regents of the University of California.
4 * See COPYING for copyright and distribution information.
5 */
6
7#include "../ndn-handle/tcp-bulk-insert-handle.hpp"
8
9#include "sqlite-fixture.hpp"
10#include "dataset-fixtures.hpp"
11
12#include <boost/test/unit_test.hpp>
13
14namespace repo {
15namespace tests {
16
17BOOST_AUTO_TEST_SUITE(TcpBulkInsertHandle)
18
19class TcpClient
20{
21public:
22 TcpClient()
23 : socket(ioService)
24 {
25 }
26
27 void
28 start(const std::string& host, const std::string& port)
29 {
30 using namespace boost::asio;
31
32 ip::tcp::resolver resolver(ioService);
33 ip::tcp::resolver::query query(host, port);
34
35 ip::tcp::resolver::iterator endpoint = resolver.resolve(query);
36 ip::tcp::resolver::iterator end;
37
38 if (endpoint == end)
39 BOOST_FAIL("Cannot resolve [" + host + ":" + port + "]");
40
41 ip::tcp::endpoint serverEndpoint = *endpoint;
42
43 socket.async_connect(serverEndpoint,
44 bind(&TcpClient::onSuccessfullConnect, this, _1));
45 }
46
47 virtual void
48 onSuccessfullConnect(const boost::system::error_code& error)
49 {
50 if (error)
51 {
52 BOOST_FAIL("TCP connection aborted");
53 return;
54 }
55 }
56
57public:
58 boost::asio::io_service ioService;
59 boost::asio::ip::tcp::socket socket;
60};
61
62template<class Dataset>
63class TcpBulkInsertFixture : public TcpClient,
64 public SqliteFixture,
65 public Dataset
66{
67public:
68 TcpBulkInsertFixture()
69 : scheduler(ioService)
70 , bulkInserter(ioService, *handle)
71 {
72 guardEvent = scheduler.scheduleEvent(ndn::time::seconds(2),
73 bind(&TcpBulkInsertFixture::fail, this, "Test timed out"));
74 }
75
76 virtual void
77 onSuccessfullConnect(const boost::system::error_code& error)
78 {
79 TcpClient::onSuccessfullConnect(error);
80
81 // This value may need to be adjusted if some dataset exceeds 100k
82 socket.set_option(boost::asio::socket_base::send_buffer_size(100000));
83
84 // Initially I wrote the following to use scatter-gather approach (using
85 // std::vector<const_buffer> and a single socket.async_send operation). Unfortunately, as
86 // described in http://www.boost.org/doc/libs/1_48_0/doc/html/boost_asio/overview/implementation.html,
87 // scatter-gather is limited to at most `min(64,IOV_MAX)` buffers to be transmitted
88 // in a single operation
89 for (typename Dataset::DataContainer::iterator i = this->data.begin();
90 i != this->data.end(); ++i) {
91
92 socket.async_send(boost::asio::buffer((*i)->wireEncode().wire(), (*i)->wireEncode().size()),
93 bind(&TcpBulkInsertFixture::onSendFinished, this, _1, false));
94 }
95
96 socket.async_send(boost::asio::buffer(static_cast<const uint8_t*>(0), 0),
97 bind(&TcpBulkInsertFixture::onSendFinished, this, _1, true));
98 }
99
100 void
101 onSendFinished(const boost::system::error_code& error, bool isFinal)
102 {
103 if (error) {
104 BOOST_FAIL("TCP connection aborted");
105 return;
106 }
107
108 if (isFinal) {
109 scheduler.cancelEvent(guardEvent);
110
111 // In case there are some outstanding handlers
112 // ioService.post(bind(&TcpBulkInsertFixture::stop, this));
113 scheduler.scheduleEvent(ndn::time::seconds(1),
114 bind(&TcpBulkInsertFixture::stop, this));
115 }
116 }
117
118 void
119 fail(const std::string& info)
120 {
121 ioService.stop();
122 BOOST_FAIL(info);
123 }
124
125 void
126 stop()
127 {
128 // Terminate test
129 socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both);
130 socket.close();
131
132 bulkInserter.stop();
133 // may be ioService.stop() as well
134 }
135
136public:
137 Scheduler scheduler;
138 ndn::EventId guardEvent;
139 repo::TcpBulkInsertHandle bulkInserter;
140};
141
142
143BOOST_FIXTURE_TEST_CASE_TEMPLATE(BulkInsertAndRead, T, DatasetFixtures, TcpBulkInsertFixture<T>)
144{
145 BOOST_TEST_MESSAGE(T::getName());
146 // BOOST_CHECK_EQUAL(this->handle->size(), 1);
147
148 // start bulk inserter
149 this->bulkInserter.listen("localhost", "17376");
150
151 // start test
152 this->start("localhost", "17376");
153
154 // actually run the test
155 this->ioService.run();
156
157 BOOST_CHECK_EQUAL(this->handle->size(), this->data.size());
158
159 // Read (all items should exist)
160 for (typename T::InterestContainer::iterator i = this->interests.begin();
161 i != this->interests.end(); ++i) {
162 ndn::Data retrievedData;
163 BOOST_REQUIRE_EQUAL(this->handle->readData(i->first, retrievedData), true);
164 BOOST_CHECK_EQUAL(retrievedData, *i->second);
165 }
166}
167
168
169BOOST_AUTO_TEST_SUITE_END()
170
171} // namespace repo
172} // namespace tests