blob: bada073ce686aeeb04b588001fa5ce464cfd2a90 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
24
25#include <thread>
26
27
28#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060041
42#include "mysql/mysql.h"
43
Alison Craig2a4d5282015-04-10 12:00:02 -060044#include <map>
45#include <memory>
46#include <mutex>
47#include <sstream>
48#include <string>
49
50namespace atmos {
51namespace query {
52
53static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
54
55/**
56 * QueryAdapter handles the Query usecases for the catalog
57 */
58template <typename DatabaseHandler>
59class QueryAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
60public:
61 /**
62 * Constructor
63 *
64 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
66 * and ChronoSync requests against
67 * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
68 * @param prefix: Name that will define the prefix to all queries requests that will be
69 * routed to this specific Catalog Instance
70 */
71 QueryAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
72 std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
73
74 /**
75 * Destructor
76 */
77 virtual
78 ~QueryAdapter();
79
80 /**
81 * Handles incoming query requests by stripping the filter off the Interest to get the
82 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
83 *
84 * @param filter: InterestFilter that caused this Interest to be routed
85 * @param interest: Interest that needs to be handled
86 */
87 virtual void
88 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
89
90 /**
91 * Handles requests for responses to an existing query
92 *
93 * @param filter: InterestFilter that caused this Interest to be routed
94 * @param interest: Interest that needs to be handled
95 */
96 virtual void
97 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
98
99private:
100 /**
101 * Helper function that generates query results
102 *
103 * @param face: Face that will be used for NDN communications
104 * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
105 * and ChronoSync requests against
Alison Craig1aced7d2015-04-10 12:00:02 -0600106 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
108 */
109 void
110 query(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
111 std::shared_ptr<const ndn::Interest> interest,
112 std::shared_ptr<DatabaseHandler> databaseHandler);
113
114 /**
115 * Helper function that publishes JSON
116 *
117 * @param face: Face that will send the Data out on
118 * @param keyChain: KeyChain to sign the Data we're creating
119 * @param segmentPrefix: Name that identifies the Prefix for the Data
120 * @param value: Json::Value to be sent in the Data
121 * @param segmentNo: uint64_t the segment for this Data
122 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
123 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
124 */
125 void
126 publishJson(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
127 const ndn::Name& segmentPrefix, const Json::Value& value,
128 uint64_t segmentNo, bool isFinalBlock, bool isAutocomplete);
129
130 /**
131 * Helper function that publishes char*
132 *
133 * @param face: Face that will send the Data out on
134 * @param keyChain: KeyChain to sign the Data we're creating
135 * @param segmentPrefix: Name that identifies the Prefix for the Data
136 * @param payload: char* to be sent in the Data
137 * @param payloadLength: size_t to indicate how long payload is
138 * @param segmentNo: uint64_t the segment for this Data
139 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
140 */
141 void
142 publishSegment(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
143 const ndn::Name& segmentPrefix, const char* payload, size_t payloadLength,
144 uint64_t segmentNo, bool isFinalBlock);
145
Alison Craig1aced7d2015-04-10 12:00:02 -0600146 /**
147 * Helper function that generates query results from a Json query
148 *
149 * @param face: Face that will be used for NDN communications
150 * @param jsonQuery: String containing the JSON query
151 * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
152 * and ChronoSync requests against
153 * @param interest: Interest that needs to be handled
154 * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
155 */
156 void
157 runJsonQuery(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
158 std::shared_ptr<const ndn::Interest> interest, const std::string& jsonQuery,
159 std::shared_ptr<DatabaseHandler> databaseHandler);
160
Alison Craig2a4d5282015-04-10 12:00:02 -0600161 // mutex to control critical sections
162 std::mutex m_mutex;
163 // @{ needs m_mutex protection
164 // The Queries we are currently writing to
165 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600166
167 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600168 // @}
169};
170
171
172template <typename DatabaseHandler>
173QueryAdapter<DatabaseHandler>::QueryAdapter(std::shared_ptr<ndn::Face> face,
174 std::shared_ptr<ndn::KeyChain> keyChain,
175 std::shared_ptr<DatabaseHandler> databaseHandler,
176 const ndn::Name& prefix)
177 : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler, prefix)
Alison Craig1aced7d2015-04-10 12:00:02 -0600178 , m_cache(100000000)
Alison Craig2a4d5282015-04-10 12:00:02 -0600179{
Alison Craig1aced7d2015-04-10 12:00:02 -0600180 atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query")),
181 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryInterest,
182 this, _1, _2),
183 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
184 this, _1),
185 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
186 this, _1, _2));
Alison Craig2a4d5282015-04-10 12:00:02 -0600187
Alison Craig1aced7d2015-04-10 12:00:02 -0600188 atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query-results")),
189 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
190 this, _1, _2),
191 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
192 this, _1),
193 bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
194 this, _1, _2));
Alison Craig2a4d5282015-04-10 12:00:02 -0600195}
196
197template <typename DatabaseHandler>
198QueryAdapter<DatabaseHandler>::~QueryAdapter(){
199 // empty
200}
201
202template <typename DatabaseHandler>
203void
204QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
205{
206 // strictly enforce query initialization namespace. Name should be our local prefix + "query" + parameters
207 if (interest.getName().size() != filter.getPrefix().size() + 1) {
208 // @todo: return a nack
209 return;
210 }
211
212 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
213
214 std::thread queryThread(&QueryAdapter<DatabaseHandler>::query, this,
215 atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
216 atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
217 atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
218 queryThread.join();
219}
220
221template <typename DatabaseHandler>
222void
223QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
224 const ndn::Interest& interest)
225{
226 // FIXME Results are currently getting served out of the forwarder's
227 // CS so we just ignore any retrieval Interests that hit us for
228 // now. In the future, this should check some form of
229 // InMemoryStorage.
Alison Craig1aced7d2015-04-10 12:00:02 -0600230 auto data = m_cache.find(interest.getName());
231 if (data) {
232 atmos::util::CatalogAdapter<DatabaseHandler>::m_face->put(*data);
233 } else {
234 // regenerate query
235 const std::string jsonQuery(reinterpret_cast<const char*>(interest.getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
236
237 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
238 std::thread queryRegenThread(&QueryAdapter<DatabaseHandler>::runJsonQuery, this,
239 atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
240 atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
241 jsonQuery,
242 atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
243 queryRegenThread.join();
244 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600245}
246
247template <typename DatabaseHandler>
248void
249QueryAdapter<DatabaseHandler>::publishJson(std::shared_ptr<ndn::Face> face,
250 std::shared_ptr<ndn::KeyChain> keyChain,
251 const ndn::Name& segmentPrefix,
252 const Json::Value& value,
253 uint64_t segmentNo, bool isFinalBlock,
254 bool isAutocomplete)
255{
256 Json::Value entry;
257 Json::FastWriter fastWriter;
258 if (isAutocomplete) {
259 entry["next"] = value;
260 } else {
261 entry["results"] = value;
262 }
263 const std::string jsonMessage = fastWriter.write(entry);
264 publishSegment(face, keyChain, segmentPrefix, jsonMessage.c_str(), jsonMessage.size() + 1,
265 segmentNo, isFinalBlock);
266}
267
268template <typename DatabaseHandler>
269void
270QueryAdapter<DatabaseHandler>::publishSegment(std::shared_ptr<ndn::Face> face,
271 std::shared_ptr<ndn::KeyChain> keyChain,
272 const ndn::Name& segmentPrefix,
273 const char* payload, size_t payloadLength,
274 uint64_t segmentNo, bool isFinalBlock)
275{
276 ndn::Name segmentName(segmentPrefix);
277 if (isFinalBlock) {
278 segmentName.append("END");
279 } else {
280 segmentName.appendSegment(segmentNo);
281 }
282
283 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
284 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
285 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
286
287 if (isFinalBlock) {
288 data->setFinalBlockId(segmentName[-1]);
289 }
290 keyChain->sign(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600291 //face->put(*data);
292
293 m_mutex.lock();
294 m_cache.insert(*data);
295 m_mutex.unlock();
Alison Craig2a4d5282015-04-10 12:00:02 -0600296}
297
298template <typename DatabaseHandler>
299void
300QueryAdapter<DatabaseHandler>::query(std::shared_ptr<ndn::Face> face,
301 std::shared_ptr<ndn::KeyChain> keyChain,
302 std::shared_ptr<const ndn::Interest> interest,
303 std::shared_ptr<DatabaseHandler> databaseHandler)
304{
Alison Craig1aced7d2015-04-10 12:00:02 -0600305 // 1) Strip the prefix off the ndn::Interest's ndn::Name
306 // +1 to grab JSON component after "query" component
307 const std::string jsonQuery(reinterpret_cast<const char*>(interest->getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
308 if (jsonQuery.length() > 0) {
309 runJsonQuery(face, keyChain, interest, jsonQuery, databaseHandler);
310 } // else NACK?
311}
312
313template <typename DatabaseHandler>
314void
315QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<ndn::Face> face,
316 std::shared_ptr<ndn::KeyChain> keyChain,
317 std::shared_ptr<const ndn::Interest> interest,
318 const std::string& jsonQuery,
319 std::shared_ptr<DatabaseHandler> databaseHandler)
320{
Alison Craig2a4d5282015-04-10 12:00:02 -0600321 // @todo: we should return a NACK as we have no database
322}
323
324
325template <>
326void
Alison Craig1aced7d2015-04-10 12:00:02 -0600327QueryAdapter<MYSQL>::runJsonQuery(std::shared_ptr<ndn::Face> face,
328 std::shared_ptr<ndn::KeyChain> keyChain,
329 std::shared_ptr<const ndn::Interest> interest,
330 const std::string& jsonQuery,
331 std::shared_ptr<MYSQL> databaseHandler)
Alison Craig2a4d5282015-04-10 12:00:02 -0600332{
Alison Craig2a4d5282015-04-10 12:00:02 -0600333 // For efficiency, do a double check. Once without the lock, then with it.
334 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
335 m_mutex.lock();
336 { // !!! BEGIN CRITICAL SECTION !!!
337 // If this fails upon locking, we removed it during our search.
338 // An unusual race-condition case, which requires things like PIT aggregation to be off.
339 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
340 if (iter != m_activeQueryToFirstResponse.end()) {
341 face->put(*(iter->second));
342 m_mutex.unlock(); //escape lock
343 return;
344 }
345 } // !!! END CRITICAL SECTION !!!
346 m_mutex.unlock();
347 }
348
349 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
350 Json::Value parsedFromString;
351 Json::Reader reader;
352 if (reader.parse(jsonQuery, parsedFromString)) {
353 const ndn::name::Component version = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
354
355 // JSON parsed ok, so we can acknowledge successful receipt of the query
356 ndn::Name ackName(interest->getName());
357 ackName.append(version);
358 ackName.append("OK");
359
360 std::shared_ptr<ndn::Data> ack(std::make_shared<ndn::Data>(ackName));
361
362 m_mutex.lock();
363 { // !!! BEGIN CRITICAL SECTION !!!
364 // An unusual race-condition case, which requires things like PIT aggregation to be off.
365 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
366 if (iter != m_activeQueryToFirstResponse.end()) {
367 face->put(*(iter->second));
368 m_mutex.unlock(); // escape lock
369 return;
370 }
371 // This is where things are expensive so we save them for the lock
372 keyChain->sign(*ack);
373 face->put(*ack);
374 m_activeQueryToFirstResponse.insert(std::pair<std::string,
375 std::shared_ptr<ndn::Data>>(jsonQuery, ack));
376 } // !!! END CRITICAL SECTION !!!
377 m_mutex.unlock();
378
379 // 3) Convert the JSON Query into a MySQL one
380 bool autocomplete = false;
381 std::stringstream mysqlQuery;
382 mysqlQuery << "SELECT name FROM cmip5";
383 bool input = false;
384 for (Json::Value::iterator iter = parsedFromString.begin(); iter != parsedFromString.end(); ++iter)
385 {
386 Json::Value key = iter.key();
387 Json::Value value = (*iter);
388
389 if (input) {
390 mysqlQuery << " AND";
391 } else {
392 mysqlQuery << " WHERE";
393 }
394
395 // Auto-complete case
396 if (key.asString().compare("?") == 0) {
397 mysqlQuery << " name REGEXP '^" << value.asString() << "'";
398 autocomplete = true;
399 }
400 // Component case
401 else {
402 mysqlQuery << " " << key.asString() << "='" << value.asString() << "'";
403 }
404 input = true;
405 }
406
407 if (!input) { // Force it to be the empty set
408 mysqlQuery << " limit 0";
409 }
410 mysqlQuery << ";";
411
412 // 4) Run the Query
413 // We're assuming that databaseHandler has already been connected to the database
414 std::shared_ptr<MYSQL_RES> results = atmos::util::PerformQuery(databaseHandler, mysqlQuery.str());
415
416 MYSQL_ROW row;
Alison Craig1aced7d2015-04-10 12:00:02 -0600417 ndn::Name segmentPrefix(atmos::util::CatalogAdapter<MYSQL>::m_prefix);
Alison Craig2a4d5282015-04-10 12:00:02 -0600418 segmentPrefix.append("query-results");
419 segmentPrefix.append(version);
420
421 size_t usedBytes = 0;
422 const size_t PAYLOAD_LIMIT = 7000;
423 uint64_t segmentNo = 0;
424 Json::Value array;
425 while ((row = mysql_fetch_row(results.get())))
426 {
427 size_t size = strlen(row[0]) + 1;
428 if (usedBytes + size > PAYLOAD_LIMIT) {
429 publishJson(face, keyChain, segmentPrefix, array, segmentNo, false, autocomplete);
430 array.clear();
431 usedBytes = 0;
432 segmentNo++;
433 }
434 array.append(row[0]);
435 usedBytes += size;
436 }
437 publishJson(face, keyChain, segmentPrefix, array, segmentNo, true, autocomplete);
438 }
439}
440
441} // namespace query
442} // namespace atmos
443#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP