blob: ccfefb6bb63820a15de19b42edadda573e68a397 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060041
42#include "mysql/mysql.h"
43
Alison Craig2a4d5282015-04-10 12:00:02 -060044#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060045#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <memory>
47#include <mutex>
48#include <sstream>
49#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060050#include <array>
Alison Craig2a4d5282015-04-10 12:00:02 -060051
52namespace atmos {
53namespace query {
Chengyu Fan92440162015-07-09 14:43:31 -060054// todo: calculate payload limit by get the size of a signed empty Data packet
55static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060056
57/**
58 * QueryAdapter handles the Query usecases for the catalog
59 */
60template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060061class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060062public:
63 /**
64 * Constructor
65 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060066 * @param face: Face that will be used for NDN communications
67 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060068 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060069 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
70 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060071
Alison Craig2a4d5282015-04-10 12:00:02 -060072 virtual
73 ~QueryAdapter();
74
75 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060076 * Helper function to specify section handler
77 */
78 void
79 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060080 const ndn::Name& prefix,
81 const std::vector<std::string>& nameFields);
Chengyu Fanb25835b2015-04-28 17:09:35 -060082
83protected:
84 /**
85 * Helper function for configuration parsing
86 */
87 void
88 onConfig(const util::ConfigSection& section,
89 bool isDryDun,
90 const std::string& fileName,
91 const ndn::Name& prefix);
92
93 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060094 * Handles incoming query requests by stripping the filter off the Interest to get the
95 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
96 *
97 * @param filter: InterestFilter that caused this Interest to be routed
98 * @param interest: Interest that needs to be handled
99 */
100 virtual void
101 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
102
103 /**
104 * Handles requests for responses to an existing query
105 *
106 * @param filter: InterestFilter that caused this Interest to be routed
107 * @param interest: Interest that needs to be handled
108 */
109 virtual void
110 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
111
Alison Craig2a4d5282015-04-10 12:00:02 -0600112 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600113 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600114 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600115 * @param segmentPrefix: Name that identifies the Prefix for the Data
116 * @param value: Json::Value to be sent in the Data
117 * @param segmentNo: uint64_t the segment for this Data
118 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
119 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600120 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600121 * @param resultCount: the number of records in the query results
Alison Craig2a4d5282015-04-10 12:00:02 -0600122 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600123 std::shared_ptr<ndn::Data>
124 makeReplyData(const ndn::Name& segmentPrefix,
125 const Json::Value& value,
126 uint64_t segmentNo,
127 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600128 bool isAutocomplete,
129 uint64_t resultCount);
Alison Craig2a4d5282015-04-10 12:00:02 -0600130
131 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600132 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600133 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600134 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600135 */
136 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600137 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600138
Alison Craig1aced7d2015-04-10 12:00:02 -0600139 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600140 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600141 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600142 * @param interest: Intersts that needs to be handled
143 * @param version: Version that needs to be in the data name
144 */
145 std::shared_ptr<ndn::Data>
146 makeAckData(std::shared_ptr<const ndn::Interest> interest,
147 const ndn::Name::Component& version);
148
149 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600150 * Helper function that sends NACK
151 *
152 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600153 */
154 void
Chengyu Fan92440162015-07-09 14:43:31 -0600155 sendNack(const ndn::Name& dataPrefix);
156
157 /**
158 * Helper function that generates the sqlQuery string for component-based query
159 * @param sqlQuery: stringstream to save the sqlQuery string
160 * @param jsonValue: Json value that contains the query information
161 */
162 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600163 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600164 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600165
166 /**
167 * Helper function that signs the data
168 */
169 void
170 signData(ndn::Data& data);
171
172 /**
173 * Helper function that publishes query-results data segments
174 */
175 virtual void
176 prepareSegments(const ndn::Name& segmentPrefix,
177 const std::string& sqlString,
178 bool autocomplete);
179
180 /**
181 * Helper function to set the DatabaseHandler
182 */
183 void
184 setDatabaseHandler(const util::ConnectionDetails& databaseId);
185
186 /**
187 * Helper function that set filters to make the adapter work
188 */
189 void
190 setFilters();
191
Chengyu Fan92440162015-07-09 14:43:31 -0600192 /**
193 * Helper function that generates the sqlQuery string for autocomplete query
194 * @param sqlQuery: stringstream to save the sqlQuery string
195 * @param jsonValue: Json value that contains the query information
196 */
197 bool
198 json2AutocompletionSql(std::stringstream& sqlQuery,
199 Json::Value& jsonValue);
200
Chengyu Fan46398212015-08-11 11:23:13 -0600201 bool
202 json2CompleteSearchSql(std::stringstream& sqlQuery,
203 Json::Value& jsonValue);
204
Chengyu Fanb25835b2015-04-28 17:09:35 -0600205protected:
206 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
207 // Handle to the Catalog's database
208 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600209
Alison Craig2a4d5282015-04-10 12:00:02 -0600210 // mutex to control critical sections
211 std::mutex m_mutex;
212 // @{ needs m_mutex protection
213 // The Queries we are currently writing to
214 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600215
216 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600217 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600218 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600219 //std::vector<std::string> m_atmosColumns;
220 ndn::Name m_catalogId; // should be replaced with the PK digest
Alison Craig2a4d5282015-04-10 12:00:02 -0600221};
222
Alison Craig2a4d5282015-04-10 12:00:02 -0600223template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600224QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
225 const std::shared_ptr<ndn::KeyChain>& keyChain)
226 : util::CatalogAdapter(face, keyChain)
227 , m_cache(250000)
Chengyu Fan92440162015-07-09 14:43:31 -0600228 , m_catalogId("catalogIdPlaceHolder")
Alison Craig2a4d5282015-04-10 12:00:02 -0600229{
Alison Craig2a4d5282015-04-10 12:00:02 -0600230}
231
232template <typename DatabaseHandler>
233void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600234QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600235{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600236 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
237 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
238 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
239 this, _1, _2),
240 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
241 this, _1),
242 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
243 this, _1, _2));
244
245 ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
246 m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
247 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
248 this, _1, _2),
249 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
250 this, _1),
251 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
252 this, _1, _2));
253}
254
255template <typename DatabaseHandler>
256void
257QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600258 const ndn::Name& prefix,
259 const std::vector<std::string>& nameFields)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600260{
Chengyu Fan92440162015-07-09 14:43:31 -0600261 m_nameFields = nameFields;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600262 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
263 _1, _2, _3, prefix));
264}
265
266template <typename DatabaseHandler>
267void
268QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
269 bool isDryRun,
270 const std::string& filename,
271 const ndn::Name& prefix)
272{
273 using namespace util;
274 if (isDryRun) {
275 return;
276 }
277 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
278 for (auto item = section.begin();
279 item != section.end();
280 ++ item)
281 {
282 if (item->first == "signingId") {
283 signingId.assign(item->second.get_value<std::string>());
284 if (signingId.empty()) {
285 throw Error("Empty value for \"signingId\""
286 " in \"query\" section");
287 }
288 }
289 if (item->first == "database") {
290 const util::ConfigSection& dataSection = item->second;
291 for (auto subItem = dataSection.begin();
292 subItem != dataSection.end();
293 ++ subItem)
294 {
295 if (subItem->first == "dbServer") {
296 dbServer.assign(subItem->second.get_value<std::string>());
297 if (dbServer.empty()){
298 throw Error("Invalid value for \"dbServer\""
299 " in \"query\" section");
300 }
301 }
302 if (subItem->first == "dbName") {
303 dbName.assign(subItem->second.get_value<std::string>());
304 if (dbName.empty()){
305 throw Error("Invalid value for \"dbName\""
306 " in \"query\" section");
307 }
308 }
309 if (subItem->first == "dbUser") {
310 dbUser.assign(subItem->second.get_value<std::string>());
311 if (dbUser.empty()){
312 throw Error("Invalid value for \"dbUser\""
313 " in \"query\" section");
314 }
315 }
316 if (subItem->first == "dbPasswd") {
317 dbPasswd.assign(subItem->second.get_value<std::string>());
318 if (dbPasswd.empty()){
319 throw Error("Invalid value for \"dbPasswd\""
320 " in \"query\" section");
321 }
322 }
323 }
324 }
325 }
326
327 m_prefix = prefix;
328 m_signingId = ndn::Name(signingId);
329 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
330
331 setDatabaseHandler(mysqlId);
332 setFilters();
333}
334
335template <typename DatabaseHandler>
336void
337QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
338{
339 //empty
340}
341
342template <>
343void
344QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
345{
346 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
347
348 m_databaseHandler = conn;
349}
350
351template <typename DatabaseHandler>
352QueryAdapter<DatabaseHandler>::~QueryAdapter()
353{
354 for (const auto& itr : m_registeredPrefixList) {
355 if (static_cast<bool>(itr.second))
356 m_face->unsetInterestFilter(itr.second);
357 }
358}
359
360template <typename DatabaseHandler>
361void
362QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
363 const ndn::Interest& interest)
364{
365 // strictly enforce query initialization namespace.
366 // Name should be our local prefix + "query" + parameters
Alison Craig2a4d5282015-04-10 12:00:02 -0600367 if (interest.getName().size() != filter.getPrefix().size() + 1) {
368 // @todo: return a nack
369 return;
370 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600371 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600372
373 std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
374
Chengyu Fanb25835b2015-04-28 17:09:35 -0600375 // @todo: use thread pool
376 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
377 this,
378 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600379 queryThread.join();
380}
381
382template <typename DatabaseHandler>
383void
384QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
385 const ndn::Interest& interest)
386{
387 // FIXME Results are currently getting served out of the forwarder's
388 // CS so we just ignore any retrieval Interests that hit us for
389 // now. In the future, this should check some form of
390 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600391
392 std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
393
Alison Craig1aced7d2015-04-10 12:00:02 -0600394 auto data = m_cache.find(interest.getName());
395 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600396 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600397 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600398}
399
400template <typename DatabaseHandler>
401void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600402QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600403{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600404 if (m_signingId.empty())
405 m_keyChain->sign(data);
406 else {
407 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
408 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
409 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600410 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600411}
412
413template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600414std::shared_ptr<ndn::Data>
415QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
416 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600417{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600418 // JSON parsed ok, so we can acknowledge successful receipt of the query
419 ndn::Name ackName(interest->getName());
420 ackName.append(version);
Chengyu Fan92440162015-07-09 14:43:31 -0600421 ackName.append(m_catalogId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600422 ackName.append("OK");
Alison Craig2a4d5282015-04-10 12:00:02 -0600423
Chengyu Fanb25835b2015-04-28 17:09:35 -0600424 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
Chengyu Fan92440162015-07-09 14:43:31 -0600425 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
426
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427 signData(*ack);
Chengyu Fan92440162015-07-09 14:43:31 -0600428#ifndef NDEBUG
429 std::cout << "makeAckData : " << ackName << std::endl;
430#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -0600431 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600432}
433
434template <typename DatabaseHandler>
435void
Chengyu Fan92440162015-07-09 14:43:31 -0600436QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600437{
Chengyu Fan92440162015-07-09 14:43:31 -0600438 uint64_t segmentNo = 0;
439
440 std::shared_ptr<ndn::Data> nack =
441 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
442 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
443 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
444
445 signData(*nack);
Chengyu Fan46398212015-08-11 11:23:13 -0600446#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600447 std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600448#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600449 m_mutex.lock();
450 m_cache.insert(*nack);
451 m_mutex.unlock();
452}
453
454
455template <typename DatabaseHandler>
456bool
457QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
458 Json::Value& jsonValue)
459{
460#ifndef NDEBUG
461 std::cout << "jsonValue in json2Sql: " << jsonValue.toStyledString() << std::endl;
462#endif
463 if (jsonValue.type() != Json::objectValue) {
464 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
465 return false;
466 }
467
Chengyu Fanb25835b2015-04-28 17:09:35 -0600468 sqlQuery << "SELECT name FROM cmip5";
469 bool input = false;
470 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
471 {
472 Json::Value key = iter.key();
473 Json::Value value = (*iter);
474
Chengyu Fan92440162015-07-09 14:43:31 -0600475 if (key == Json::nullValue || value == Json::nullValue) {
476 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
477 return false;
478 }
479
480 // cannot convert to string
481 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
482 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
483 return false;
484 }
485
486 if (key.asString().compare("?") == 0) {
487 continue;
488 }
489
Chengyu Fanb25835b2015-04-28 17:09:35 -0600490 if (input) {
491 sqlQuery << " AND";
492 } else {
493 sqlQuery << " WHERE";
494 }
495
Chengyu Fan92440162015-07-09 14:43:31 -0600496 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600497 input = true;
498 }
499
500 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600501 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600502 }
503 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600504 return true;
505}
506
507template <typename DatabaseHandler>
508bool
509QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
510 Json::Value& jsonValue)
511{
512#ifndef NDEBUG
513 std::cout << "jsonValue in json2AutocompletionSql: " << jsonValue.toStyledString() << std::endl;
514#endif
515 if (jsonValue.type() != Json::objectValue) {
516 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
517 return false;
518 }
519
520 std::string typedString;
521 // get the string in the jsonValue
522 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
523 {
524 Json::Value key = iter.key();
525 Json::Value value = (*iter);
526
527 if (key == Json::nullValue || value == Json::nullValue) {
528 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
529 return false;
530 }
531
532 // cannot convert to string
533 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
534 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
535 return false;
536 }
537
538 if (key.asString().compare("?") == 0) {
539 typedString.assign(value.asString());
540 // since the front end triggers the autocompletion when users typed '/',
541 // there must be a '/' at the end, and the first char must be '/'
542 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
543 return false;
544 break;
545 }
546 }
547
548 // 1. get the expected column number by parsing the typedString, so we can get the filed name
549 size_t pos = 0;
550 size_t start = 1; // start from the 1st char which is not '/'
551 size_t count = 0; // also the name to query for
552 std::string token;
553 std::string delimiter = "/";
554 std::map<std::string, std::string> typedComponents;
555 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
556 token = typedString.substr(start, pos - start);
557 if (count >= m_nameFields.size() - 1) {
558 return false;
559 }
560
561 // add column name and value (token) into map
562 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
563 count ++;
564 start = pos + 1;
565 }
566
567 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
568 // return true
569 bool more = false;
Chengyu Fan46398212015-08-11 11:23:13 -0600570 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
571 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
572 it != typedComponents.end(); ++it) {
573 if (more)
574 sqlQuery << " AND";
575 else
576 sqlQuery << " WHERE";
577
578 sqlQuery << " " << it->first << "='" << it->second << "'";
579
580 more = true;
581 }
582 sqlQuery << ";";
583 return true;
584}
585
586template <typename DatabaseHandler>
587bool
588QueryAdapter<DatabaseHandler>::json2CompleteSearchSql(std::stringstream& sqlQuery,
589 Json::Value& jsonValue)
590{
591#ifndef NDEBUG
592 std::cout << "jsonValue in json2CompleteSearchSql: " << jsonValue.toStyledString() << std::endl;
593#endif
594 if (jsonValue.type() != Json::objectValue) {
595 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
596 return false;
597 }
598
599 std::string typedString;
600 // get the string in the jsonValue
601 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
602 {
603 Json::Value key = iter.key();
604 Json::Value value = (*iter);
605
606 if (key == Json::nullValue || value == Json::nullValue) {
607 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
608 return false;
609 }
610
611 // cannot convert to string
612 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
613 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
614 return false;
615 }
616
617 if (key.asString().compare("??") == 0) {
618 typedString.assign(value.asString());
619 // since the front end triggers the autocompletion when users typed '/',
620 // there must be a '/' at the end, and the first char must be '/'
621 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
622 return false;
623 break;
624 }
625 }
626
627 // 1. get the expected column number by parsing the typedString, so we can get the filed name
628 size_t pos = 0;
629 size_t start = 1; // start from the 1st char which is not '/'
630 size_t count = 0; // also the name to query for
631 std::string token;
632 std::string delimiter = "/";
633 std::map<std::string, std::string> typedComponents;
634 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
635 token = typedString.substr(start, pos - start);
636 if (count >= m_nameFields.size() - 1) {
637 return false;
638 }
639
640 // add column name and value (token) into map
641 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
642 count ++;
643 start = pos + 1;
644 }
645
646 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
647 // return true
648 bool more = false;
649 sqlQuery << "SELECT name FROM cmip5";
Chengyu Fan92440162015-07-09 14:43:31 -0600650 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
651 it != typedComponents.end(); ++it) {
652 if (more)
653 sqlQuery << " AND";
654 else
655 sqlQuery << " WHERE";
656
657 sqlQuery << " " << it->first << "='" << it->second << "'";
658
659 more = true;
660 }
661 sqlQuery << ";";
662 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600663}
664
665template <typename DatabaseHandler>
666void
667QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600668{
Alison Craig1aced7d2015-04-10 12:00:02 -0600669 // 1) Strip the prefix off the ndn::Interest's ndn::Name
670 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600671
Chengyu Fanb25835b2015-04-28 17:09:35 -0600672 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
673 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
674 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600675
Chengyu Fanb25835b2015-04-28 17:09:35 -0600676 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600677 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600678 return;
679 }
Chengyu Fan92440162015-07-09 14:43:31 -0600680 // check if the ACK is cached, if yes, respond with ACK
681 // ?? what if the results for now it NULL, but latter exist?
Alison Craig2a4d5282015-04-10 12:00:02 -0600682 // For efficiency, do a double check. Once without the lock, then with it.
683 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
684 m_mutex.lock();
685 { // !!! BEGIN CRITICAL SECTION !!!
686 // If this fails upon locking, we removed it during our search.
687 // An unusual race-condition case, which requires things like PIT aggregation to be off.
688 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
689 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600690 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600691 m_mutex.unlock(); //escape lock
692 return;
693 }
694 } // !!! END CRITICAL SECTION !!!
695 m_mutex.unlock();
696 }
697
698 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
699 Json::Value parsedFromString;
700 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600701 if (!reader.parse(jsonQuery, parsedFromString)) {
702 // @todo: send NACK?
703 std::cout << "cannot parse the JsonQuery" << std::endl;
704 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600705 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600706
Chengyu Fan92440162015-07-09 14:43:31 -0600707 // the version should be replaced with ChronoSync state digest
Chengyu Fanb25835b2015-04-28 17:09:35 -0600708 const ndn::name::Component version
709 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
710 ndn::time::system_clock::now()).count());
711
712 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
713
714 m_mutex.lock();
715 { // !!! BEGIN CRITICAL SECTION !!!
716 // An unusual race-condition case, which requires things like PIT aggregation to be off.
717 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
718 if (iter != m_activeQueryToFirstResponse.end()) {
719 m_face->put(*(iter->second));
720 m_mutex.unlock(); // escape lock
721 return;
722 }
723 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -0600724 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
725 // that conatin the old version when ChronoSync is updated
Chengyu Fanb25835b2015-04-28 17:09:35 -0600726 m_activeQueryToFirstResponse.insert(std::pair<std::string,
727 std::shared_ptr<ndn::Data>>(jsonQuery, ack));
728 m_face->put(*ack);
729 } // !!! END CRITICAL SECTION !!!
730 m_mutex.unlock();
731
732 // 3) Convert the JSON Query into a MySQL one
733 bool autocomplete = false;
734 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600735
Chengyu Fan92440162015-07-09 14:43:31 -0600736 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
737 // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
Chengyu Fanb25835b2015-04-28 17:09:35 -0600738 ndn::Name segmentPrefix(m_prefix);
739 segmentPrefix.append("query-results");
Chengyu Fan92440162015-07-09 14:43:31 -0600740 segmentPrefix.append(jsonStr);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600741 segmentPrefix.append(version);
Chengyu Fan92440162015-07-09 14:43:31 -0600742 segmentPrefix.append(m_catalogId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600743
Chengyu Fan92440162015-07-09 14:43:31 -0600744 Json::Value tmp;
745 // expect the autocomplete and the component-based query are separate
746 // if JSON::Value contains ? as key, is autocompletion
747 if (parsedFromString.get("?", tmp) != tmp) {
748 autocomplete = true;
749 if (!json2AutocompletionSql(sqlQuery, parsedFromString)) {
750 sendNack(segmentPrefix);
751 return;
752 }
753 }
Chengyu Fan46398212015-08-11 11:23:13 -0600754 else if (parsedFromString.get("??", tmp) != tmp) {
755 if (!json2CompleteSearchSql(sqlQuery, parsedFromString)) {
756 sendNack(segmentPrefix);
757 return;
758 }
759 }
Chengyu Fan92440162015-07-09 14:43:31 -0600760 else {
761 if (!json2Sql(sqlQuery, parsedFromString)) {
762 sendNack(segmentPrefix);
763 return;
764 }
765 }
766
767 // 4) Run the Query
Chengyu Fanb25835b2015-04-28 17:09:35 -0600768 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
769}
770
771template <typename DatabaseHandler>
772void
773QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
774 const std::string& sqlString,
775 bool autocomplete)
776{
777 // empty
778}
779
780// prepareSegments specilization function
781template<>
782void
783QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
784 const std::string& sqlString,
785 bool autocomplete)
786{
Chengyu Fan46398212015-08-11 11:23:13 -0600787#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600788 std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600789#endif
790 std::string errMsg;
791 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600792 // 4) Run the Query
793 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -0600794 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
795 if (!success)
796 std::cout << errMsg << std::endl;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600797
798 if (!results) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600799 std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -0600800
Chengyu Fanb25835b2015-04-28 17:09:35 -0600801 // @todo: throw runtime error or log the error message?
802 return;
803 }
804
Chengyu Fan92440162015-07-09 14:43:31 -0600805 uint64_t resultCount = mysql_num_rows(results.get());
806
Chengyu Fanb25835b2015-04-28 17:09:35 -0600807 std::cout << "Query results for \""
808 << sqlString
809 << "\" contain "
Chengyu Fan92440162015-07-09 14:43:31 -0600810 << resultCount
Chengyu Fanb25835b2015-04-28 17:09:35 -0600811 << " rows" << std::endl;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600812
813 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600814 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -0600815 Json::Value tmp;
816 Json::Value resultJson;
817 Json::FastWriter fastWriter;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600818 while ((row = mysql_fetch_row(results.get())))
819 {
Chengyu Fan46398212015-08-11 11:23:13 -0600820 tmp.append(row[0]);
821 const std::string tmpString = fastWriter.write(tmp);
822 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600823 std::shared_ptr<ndn::Data> data
Chengyu Fan46398212015-08-11 11:23:13 -0600824 = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600825 m_mutex.lock();
826 m_cache.insert(*data);
827 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -0600828 tmp.clear();
829 resultJson.clear();
830 segmentNo ++;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600831 }
Chengyu Fan46398212015-08-11 11:23:13 -0600832 resultJson.append(row[0]);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600833 }
Chengyu Fan46398212015-08-11 11:23:13 -0600834
Chengyu Fanb25835b2015-04-28 17:09:35 -0600835 std::shared_ptr<ndn::Data> data
Chengyu Fan46398212015-08-11 11:23:13 -0600836 = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600837 m_mutex.lock();
838 m_cache.insert(*data);
839 m_mutex.unlock();
840}
841
842template <typename DatabaseHandler>
843std::shared_ptr<ndn::Data>
844QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
845 const Json::Value& value,
846 uint64_t segmentNo,
847 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600848 bool isAutocomplete,
849 uint64_t resultCount)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600850{
851 Json::Value entry;
852 Json::FastWriter fastWriter;
Chengyu Fan92440162015-07-09 14:43:31 -0600853 Json::UInt64 count(resultCount);
854 entry["resultCount"] = count;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600855 if (isAutocomplete) {
856 entry["next"] = value;
857 } else {
858 entry["results"] = value;
859 }
860 const std::string jsonMessage = fastWriter.write(entry);
861 const char* payload = jsonMessage.c_str();
862 size_t payloadLength = jsonMessage.size() + 1;
863 ndn::Name segmentName(segmentPrefix);
864 segmentName.appendSegment(segmentNo);
865
866 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
867 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
868 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
869
870 if (isFinalBlock) {
871 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
872 }
873#ifndef NDEBUG
874 std::cout << "makeReplyData : " << segmentName << std::endl;
875#endif
876 signData(*data);
877 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -0600878}
879
880} // namespace query
881} // namespace atmos
882#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP