blob: ca86d5ecbecd65536efa8590a2bf0c542d43631c [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
24
25#include <thread>
26
27
28#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
40
41#include "mysql/mysql.h"
42
43#include <iostream>
44
45#include <map>
46#include <memory>
47#include <mutex>
48#include <sstream>
49#include <string>
50
51namespace atmos {
52namespace query {
53
54static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
55
56/**
57 * QueryAdapter handles the Query usecases for the catalog
58 */
59template <typename DatabaseHandler>
60class QueryAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
61public:
62 /**
63 * Constructor
64 *
65 * @param face: Face that will be used for NDN communications
66 * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
67 * and ChronoSync requests against
68 * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
69 * @param prefix: Name that will define the prefix to all queries requests that will be
70 * routed to this specific Catalog Instance
71 */
72 QueryAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
73 std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
74
75 /**
76 * Destructor
77 */
78 virtual
79 ~QueryAdapter();
80
81 /**
82 * Handles incoming query requests by stripping the filter off the Interest to get the
83 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
84 *
85 * @param filter: InterestFilter that caused this Interest to be routed
86 * @param interest: Interest that needs to be handled
87 */
88 virtual void
89 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
90
91 /**
92 * Handles requests for responses to an existing query
93 *
94 * @param filter: InterestFilter that caused this Interest to be routed
95 * @param interest: Interest that needs to be handled
96 */
97 virtual void
98 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
99
100private:
101 /**
102 * Helper function that generates query results
103 *
104 * @param face: Face that will be used for NDN communications
105 * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
106 * and ChronoSync requests against
107 * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
108 */
109 void
110 query(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
111 std::shared_ptr<const ndn::Interest> interest,
112 std::shared_ptr<DatabaseHandler> databaseHandler);
113
114 /**
115 * Helper function that publishes JSON
116 *
117 * @param face: Face that will send the Data out on
118 * @param keyChain: KeyChain to sign the Data we're creating
119 * @param segmentPrefix: Name that identifies the Prefix for the Data
120 * @param value: Json::Value to be sent in the Data
121 * @param segmentNo: uint64_t the segment for this Data
122 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
123 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
124 */
125 void
126 publishJson(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
127 const ndn::Name& segmentPrefix, const Json::Value& value,
128 uint64_t segmentNo, bool isFinalBlock, bool isAutocomplete);
129
130 /**
131 * Helper function that publishes char*
132 *
133 * @param face: Face that will send the Data out on
134 * @param keyChain: KeyChain to sign the Data we're creating
135 * @param segmentPrefix: Name that identifies the Prefix for the Data
136 * @param payload: char* to be sent in the Data
137 * @param payloadLength: size_t to indicate how long payload is
138 * @param segmentNo: uint64_t the segment for this Data
139 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
140 */
141 void
142 publishSegment(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
143 const ndn::Name& segmentPrefix, const char* payload, size_t payloadLength,
144 uint64_t segmentNo, bool isFinalBlock);
145
146 // mutex to control critical sections
147 std::mutex m_mutex;
148 // @{ needs m_mutex protection
149 // The Queries we are currently writing to
150 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
151 // @}
152};
153
154
155template <typename DatabaseHandler>
156QueryAdapter<DatabaseHandler>::QueryAdapter(std::shared_ptr<ndn::Face> face,
157 std::shared_ptr<ndn::KeyChain> keyChain,
158 std::shared_ptr<DatabaseHandler> databaseHandler,
159 const ndn::Name& prefix)
160 : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler, prefix)
161{
162 face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query")),
163 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryInterest,
164 this, _1, _2),
165 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
166 this, _1),
167 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
168 this, _1, _2));
169
170 face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query-results")),
171 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
172 this, _1, _2),
173 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
174 this, _1),
175 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
176 this, _1, _2));
177}
178
179template <typename DatabaseHandler>
180QueryAdapter<DatabaseHandler>::~QueryAdapter(){
181 // empty
182}
183
184template <typename DatabaseHandler>
185void
186QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
187{
188 // strictly enforce query initialization namespace. Name should be our local prefix + "query" + parameters
189 if (interest.getName().size() != filter.getPrefix().size() + 1) {
190 // @todo: return a nack
191 return;
192 }
193
194 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
195
196 std::thread queryThread(&QueryAdapter<DatabaseHandler>::query, this,
197 atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
198 atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
199 atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
200 queryThread.join();
201}
202
203template <typename DatabaseHandler>
204void
205QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
206 const ndn::Interest& interest)
207{
208 // FIXME Results are currently getting served out of the forwarder's
209 // CS so we just ignore any retrieval Interests that hit us for
210 // now. In the future, this should check some form of
211 // InMemoryStorage.
212 std::cout << "Got to query result" << std::endl;
213}
214
215template <typename DatabaseHandler>
216void
217QueryAdapter<DatabaseHandler>::publishJson(std::shared_ptr<ndn::Face> face,
218 std::shared_ptr<ndn::KeyChain> keyChain,
219 const ndn::Name& segmentPrefix,
220 const Json::Value& value,
221 uint64_t segmentNo, bool isFinalBlock,
222 bool isAutocomplete)
223{
224 Json::Value entry;
225 Json::FastWriter fastWriter;
226 if (isAutocomplete) {
227 entry["next"] = value;
228 } else {
229 entry["results"] = value;
230 }
231 const std::string jsonMessage = fastWriter.write(entry);
232 publishSegment(face, keyChain, segmentPrefix, jsonMessage.c_str(), jsonMessage.size() + 1,
233 segmentNo, isFinalBlock);
234}
235
236template <typename DatabaseHandler>
237void
238QueryAdapter<DatabaseHandler>::publishSegment(std::shared_ptr<ndn::Face> face,
239 std::shared_ptr<ndn::KeyChain> keyChain,
240 const ndn::Name& segmentPrefix,
241 const char* payload, size_t payloadLength,
242 uint64_t segmentNo, bool isFinalBlock)
243{
244 ndn::Name segmentName(segmentPrefix);
245 if (isFinalBlock) {
246 segmentName.append("END");
247 } else {
248 segmentName.appendSegment(segmentNo);
249 }
250
251 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
252 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
253 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
254
255 if (isFinalBlock) {
256 data->setFinalBlockId(segmentName[-1]);
257 }
258 keyChain->sign(*data);
259 face->put(*data);
260}
261
262template <typename DatabaseHandler>
263void
264QueryAdapter<DatabaseHandler>::query(std::shared_ptr<ndn::Face> face,
265 std::shared_ptr<ndn::KeyChain> keyChain,
266 std::shared_ptr<const ndn::Interest> interest,
267 std::shared_ptr<DatabaseHandler> databaseHandler)
268{
269 // @todo: we should return a NACK as we have no database
270}
271
272
273template <>
274void
275QueryAdapter<MYSQL>::query(std::shared_ptr<ndn::Face> face,
276 std::shared_ptr<ndn::KeyChain> keyChain,
277 std::shared_ptr<const ndn::Interest> interest,
278 std::shared_ptr<MYSQL> databaseHandler)
279{
280 std::cout << "Running query" << std::endl;
281 // 1) Strip the prefix off the ndn::Interest's ndn::Name
282 // +1 to grab JSON component after "query" component
283 const std::string jsonQuery(reinterpret_cast<const char*>(interest->getName()[m_prefix.size()+1].value()));
284
285 // For efficiency, do a double check. Once without the lock, then with it.
286 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
287 m_mutex.lock();
288 { // !!! BEGIN CRITICAL SECTION !!!
289 // If this fails upon locking, we removed it during our search.
290 // An unusual race-condition case, which requires things like PIT aggregation to be off.
291 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
292 if (iter != m_activeQueryToFirstResponse.end()) {
293 face->put(*(iter->second));
294 m_mutex.unlock(); //escape lock
295 return;
296 }
297 } // !!! END CRITICAL SECTION !!!
298 m_mutex.unlock();
299 }
300
301 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
302 Json::Value parsedFromString;
303 Json::Reader reader;
304 if (reader.parse(jsonQuery, parsedFromString)) {
305 const ndn::name::Component version = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
306
307 // JSON parsed ok, so we can acknowledge successful receipt of the query
308 ndn::Name ackName(interest->getName());
309 ackName.append(version);
310 ackName.append("OK");
311
312 std::shared_ptr<ndn::Data> ack(std::make_shared<ndn::Data>(ackName));
313
314 m_mutex.lock();
315 { // !!! BEGIN CRITICAL SECTION !!!
316 // An unusual race-condition case, which requires things like PIT aggregation to be off.
317 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
318 if (iter != m_activeQueryToFirstResponse.end()) {
319 face->put(*(iter->second));
320 m_mutex.unlock(); // escape lock
321 return;
322 }
323 // This is where things are expensive so we save them for the lock
324 keyChain->sign(*ack);
325 face->put(*ack);
326 m_activeQueryToFirstResponse.insert(std::pair<std::string,
327 std::shared_ptr<ndn::Data>>(jsonQuery, ack));
328 } // !!! END CRITICAL SECTION !!!
329 m_mutex.unlock();
330
331 // 3) Convert the JSON Query into a MySQL one
332 bool autocomplete = false;
333 std::stringstream mysqlQuery;
334 mysqlQuery << "SELECT name FROM cmip5";
335 bool input = false;
336 for (Json::Value::iterator iter = parsedFromString.begin(); iter != parsedFromString.end(); ++iter)
337 {
338 Json::Value key = iter.key();
339 Json::Value value = (*iter);
340
341 if (input) {
342 mysqlQuery << " AND";
343 } else {
344 mysqlQuery << " WHERE";
345 }
346
347 // Auto-complete case
348 if (key.asString().compare("?") == 0) {
349 mysqlQuery << " name REGEXP '^" << value.asString() << "'";
350 autocomplete = true;
351 }
352 // Component case
353 else {
354 mysqlQuery << " " << key.asString() << "='" << value.asString() << "'";
355 }
356 input = true;
357 }
358
359 if (!input) { // Force it to be the empty set
360 mysqlQuery << " limit 0";
361 }
362 mysqlQuery << ";";
363
364 // 4) Run the Query
365 // We're assuming that databaseHandler has already been connected to the database
366 std::shared_ptr<MYSQL_RES> results = atmos::util::PerformQuery(databaseHandler, mysqlQuery.str());
367
368 MYSQL_ROW row;
369 ndn::Name segmentPrefix(m_prefix);
370 segmentPrefix.append("query-results");
371 segmentPrefix.append(version);
372
373 size_t usedBytes = 0;
374 const size_t PAYLOAD_LIMIT = 7000;
375 uint64_t segmentNo = 0;
376 Json::Value array;
377 while ((row = mysql_fetch_row(results.get())))
378 {
379 size_t size = strlen(row[0]) + 1;
380 if (usedBytes + size > PAYLOAD_LIMIT) {
381 publishJson(face, keyChain, segmentPrefix, array, segmentNo, false, autocomplete);
382 array.clear();
383 usedBytes = 0;
384 segmentNo++;
385 }
386 array.append(row[0]);
387 usedBytes += size;
388 }
389 publishJson(face, keyChain, segmentPrefix, array, segmentNo, true, autocomplete);
390 }
391}
392
393} // namespace query
394} // namespace atmos
395#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP