blob: 2adb161581c4c39d7a4d03293efd5424460ac904 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060041
42#include "mysql/mysql.h"
43
Alison Craig2a4d5282015-04-10 12:00:02 -060044#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060045#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <memory>
47#include <mutex>
48#include <sstream>
49#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060050#include <array>
Alison Craig2a4d5282015-04-10 12:00:02 -060051
52namespace atmos {
53namespace query {
Chengyu Fan92440162015-07-09 14:43:31 -060054// todo: calculate payload limit by get the size of a signed empty Data packet
55static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060056
57/**
58 * QueryAdapter handles the Query usecases for the catalog
59 */
60template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060061class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060062public:
63 /**
64 * Constructor
65 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060066 * @param face: Face that will be used for NDN communications
67 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060068 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060069 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
70 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060071
Alison Craig2a4d5282015-04-10 12:00:02 -060072 virtual
73 ~QueryAdapter();
74
75 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060076 * Helper function to specify section handler
77 */
78 void
79 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060080 const ndn::Name& prefix,
81 const std::vector<std::string>& nameFields);
Chengyu Fanb25835b2015-04-28 17:09:35 -060082
83protected:
84 /**
85 * Helper function for configuration parsing
86 */
87 void
88 onConfig(const util::ConfigSection& section,
89 bool isDryDun,
90 const std::string& fileName,
91 const ndn::Name& prefix);
92
93 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060094 * Handles incoming query requests by stripping the filter off the Interest to get the
95 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
96 *
97 * @param filter: InterestFilter that caused this Interest to be routed
98 * @param interest: Interest that needs to be handled
99 */
100 virtual void
101 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
102
103 /**
104 * Handles requests for responses to an existing query
105 *
106 * @param filter: InterestFilter that caused this Interest to be routed
107 * @param interest: Interest that needs to be handled
108 */
109 virtual void
110 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
111
Alison Craig2a4d5282015-04-10 12:00:02 -0600112 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600113 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600114 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600115 * @param segmentPrefix: Name that identifies the Prefix for the Data
116 * @param value: Json::Value to be sent in the Data
117 * @param segmentNo: uint64_t the segment for this Data
118 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
119 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600120 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600121 * @param resultCount: the number of records in the query results
Alison Craig2a4d5282015-04-10 12:00:02 -0600122 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600123 std::shared_ptr<ndn::Data>
124 makeReplyData(const ndn::Name& segmentPrefix,
125 const Json::Value& value,
126 uint64_t segmentNo,
127 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600128 bool isAutocomplete,
129 uint64_t resultCount);
Alison Craig2a4d5282015-04-10 12:00:02 -0600130
131 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600132 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600133 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600134 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600135 */
136 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600137 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600138
Alison Craig1aced7d2015-04-10 12:00:02 -0600139 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600140 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600141 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600142 * @param interest: Intersts that needs to be handled
143 * @param version: Version that needs to be in the data name
144 */
145 std::shared_ptr<ndn::Data>
146 makeAckData(std::shared_ptr<const ndn::Interest> interest,
147 const ndn::Name::Component& version);
148
149 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600150 * Helper function that sends NACK
151 *
152 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600153 */
154 void
Chengyu Fan92440162015-07-09 14:43:31 -0600155 sendNack(const ndn::Name& dataPrefix);
156
157 /**
158 * Helper function that generates the sqlQuery string for component-based query
159 * @param sqlQuery: stringstream to save the sqlQuery string
160 * @param jsonValue: Json value that contains the query information
161 */
162 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600163 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600164 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600165
166 /**
167 * Helper function that signs the data
168 */
169 void
170 signData(ndn::Data& data);
171
172 /**
173 * Helper function that publishes query-results data segments
174 */
175 virtual void
176 prepareSegments(const ndn::Name& segmentPrefix,
177 const std::string& sqlString,
178 bool autocomplete);
179
180 /**
181 * Helper function to set the DatabaseHandler
182 */
183 void
184 setDatabaseHandler(const util::ConnectionDetails& databaseId);
185
186 /**
187 * Helper function that set filters to make the adapter work
188 */
189 void
190 setFilters();
191
Chengyu Fan92440162015-07-09 14:43:31 -0600192 /**
193 * Helper function that generates the sqlQuery string for autocomplete query
194 * @param sqlQuery: stringstream to save the sqlQuery string
195 * @param jsonValue: Json value that contains the query information
196 */
197 bool
198 json2AutocompletionSql(std::stringstream& sqlQuery,
199 Json::Value& jsonValue);
200
Chengyu Fanb25835b2015-04-28 17:09:35 -0600201protected:
202 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
203 // Handle to the Catalog's database
204 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600205
Alison Craig2a4d5282015-04-10 12:00:02 -0600206 // mutex to control critical sections
207 std::mutex m_mutex;
208 // @{ needs m_mutex protection
209 // The Queries we are currently writing to
210 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600211
212 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600213 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600214 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600215 //std::vector<std::string> m_atmosColumns;
216 ndn::Name m_catalogId; // should be replaced with the PK digest
Alison Craig2a4d5282015-04-10 12:00:02 -0600217};
218
Alison Craig2a4d5282015-04-10 12:00:02 -0600219template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600220QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
221 const std::shared_ptr<ndn::KeyChain>& keyChain)
222 : util::CatalogAdapter(face, keyChain)
223 , m_cache(250000)
Chengyu Fan92440162015-07-09 14:43:31 -0600224 , m_catalogId("catalogIdPlaceHolder")
Alison Craig2a4d5282015-04-10 12:00:02 -0600225{
Alison Craig2a4d5282015-04-10 12:00:02 -0600226}
227
228template <typename DatabaseHandler>
229void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600230QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600231{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600232 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
233 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
234 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
235 this, _1, _2),
236 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
237 this, _1),
238 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
239 this, _1, _2));
240
241 ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
242 m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
243 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
244 this, _1, _2),
245 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
246 this, _1),
247 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
248 this, _1, _2));
249}
250
251template <typename DatabaseHandler>
252void
253QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600254 const ndn::Name& prefix,
255 const std::vector<std::string>& nameFields)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600256{
Chengyu Fan92440162015-07-09 14:43:31 -0600257 m_nameFields = nameFields;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600258 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
259 _1, _2, _3, prefix));
260}
261
262template <typename DatabaseHandler>
263void
264QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
265 bool isDryRun,
266 const std::string& filename,
267 const ndn::Name& prefix)
268{
269 using namespace util;
270 if (isDryRun) {
271 return;
272 }
273 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
274 for (auto item = section.begin();
275 item != section.end();
276 ++ item)
277 {
278 if (item->first == "signingId") {
279 signingId.assign(item->second.get_value<std::string>());
280 if (signingId.empty()) {
281 throw Error("Empty value for \"signingId\""
282 " in \"query\" section");
283 }
284 }
285 if (item->first == "database") {
286 const util::ConfigSection& dataSection = item->second;
287 for (auto subItem = dataSection.begin();
288 subItem != dataSection.end();
289 ++ subItem)
290 {
291 if (subItem->first == "dbServer") {
292 dbServer.assign(subItem->second.get_value<std::string>());
293 if (dbServer.empty()){
294 throw Error("Invalid value for \"dbServer\""
295 " in \"query\" section");
296 }
297 }
298 if (subItem->first == "dbName") {
299 dbName.assign(subItem->second.get_value<std::string>());
300 if (dbName.empty()){
301 throw Error("Invalid value for \"dbName\""
302 " in \"query\" section");
303 }
304 }
305 if (subItem->first == "dbUser") {
306 dbUser.assign(subItem->second.get_value<std::string>());
307 if (dbUser.empty()){
308 throw Error("Invalid value for \"dbUser\""
309 " in \"query\" section");
310 }
311 }
312 if (subItem->first == "dbPasswd") {
313 dbPasswd.assign(subItem->second.get_value<std::string>());
314 if (dbPasswd.empty()){
315 throw Error("Invalid value for \"dbPasswd\""
316 " in \"query\" section");
317 }
318 }
319 }
320 }
321 }
322
323 m_prefix = prefix;
324 m_signingId = ndn::Name(signingId);
325 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
326
327 setDatabaseHandler(mysqlId);
328 setFilters();
329}
330
331template <typename DatabaseHandler>
332void
333QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
334{
335 //empty
336}
337
338template <>
339void
340QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
341{
342 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
343
344 m_databaseHandler = conn;
345}
346
347template <typename DatabaseHandler>
348QueryAdapter<DatabaseHandler>::~QueryAdapter()
349{
350 for (const auto& itr : m_registeredPrefixList) {
351 if (static_cast<bool>(itr.second))
352 m_face->unsetInterestFilter(itr.second);
353 }
354}
355
356template <typename DatabaseHandler>
357void
358QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
359 const ndn::Interest& interest)
360{
361 // strictly enforce query initialization namespace.
362 // Name should be our local prefix + "query" + parameters
Alison Craig2a4d5282015-04-10 12:00:02 -0600363 if (interest.getName().size() != filter.getPrefix().size() + 1) {
364 // @todo: return a nack
365 return;
366 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600367 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600368
369 std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
370
Chengyu Fanb25835b2015-04-28 17:09:35 -0600371 // @todo: use thread pool
372 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
373 this,
374 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600375 queryThread.join();
376}
377
378template <typename DatabaseHandler>
379void
380QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
381 const ndn::Interest& interest)
382{
383 // FIXME Results are currently getting served out of the forwarder's
384 // CS so we just ignore any retrieval Interests that hit us for
385 // now. In the future, this should check some form of
386 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600387
388 std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
389
Alison Craig1aced7d2015-04-10 12:00:02 -0600390 auto data = m_cache.find(interest.getName());
391 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600392 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600393 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600394}
395
396template <typename DatabaseHandler>
397void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600398QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600399{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600400 if (m_signingId.empty())
401 m_keyChain->sign(data);
402 else {
403 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
404 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
405 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600406 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600407}
408
409template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600410std::shared_ptr<ndn::Data>
411QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
412 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600413{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600414 // JSON parsed ok, so we can acknowledge successful receipt of the query
415 ndn::Name ackName(interest->getName());
416 ackName.append(version);
Chengyu Fan92440162015-07-09 14:43:31 -0600417 ackName.append(m_catalogId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600418 ackName.append("OK");
Alison Craig2a4d5282015-04-10 12:00:02 -0600419
Chengyu Fanb25835b2015-04-28 17:09:35 -0600420 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
Chengyu Fan92440162015-07-09 14:43:31 -0600421 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
422
Chengyu Fanb25835b2015-04-28 17:09:35 -0600423 signData(*ack);
Chengyu Fan92440162015-07-09 14:43:31 -0600424#ifndef NDEBUG
425 std::cout << "makeAckData : " << ackName << std::endl;
426#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600428}
429
430template <typename DatabaseHandler>
431void
Chengyu Fan92440162015-07-09 14:43:31 -0600432QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600433{
Chengyu Fan92440162015-07-09 14:43:31 -0600434 uint64_t segmentNo = 0;
435
436 std::shared_ptr<ndn::Data> nack =
437 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
438 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
439 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
440
441 signData(*nack);
442
443 std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
444
445 m_mutex.lock();
446 m_cache.insert(*nack);
447 m_mutex.unlock();
448}
449
450
451template <typename DatabaseHandler>
452bool
453QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
454 Json::Value& jsonValue)
455{
456#ifndef NDEBUG
457 std::cout << "jsonValue in json2Sql: " << jsonValue.toStyledString() << std::endl;
458#endif
459 if (jsonValue.type() != Json::objectValue) {
460 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
461 return false;
462 }
463
Chengyu Fanb25835b2015-04-28 17:09:35 -0600464 sqlQuery << "SELECT name FROM cmip5";
465 bool input = false;
466 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
467 {
468 Json::Value key = iter.key();
469 Json::Value value = (*iter);
470
Chengyu Fan92440162015-07-09 14:43:31 -0600471 if (key == Json::nullValue || value == Json::nullValue) {
472 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
473 return false;
474 }
475
476 // cannot convert to string
477 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
478 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
479 return false;
480 }
481
482 if (key.asString().compare("?") == 0) {
483 continue;
484 }
485
Chengyu Fanb25835b2015-04-28 17:09:35 -0600486 if (input) {
487 sqlQuery << " AND";
488 } else {
489 sqlQuery << " WHERE";
490 }
491
Chengyu Fan92440162015-07-09 14:43:31 -0600492 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600493 input = true;
494 }
495
496 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600497 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600498 }
499 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600500 return true;
501}
502
503template <typename DatabaseHandler>
504bool
505QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
506 Json::Value& jsonValue)
507{
508#ifndef NDEBUG
509 std::cout << "jsonValue in json2AutocompletionSql: " << jsonValue.toStyledString() << std::endl;
510#endif
511 if (jsonValue.type() != Json::objectValue) {
512 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
513 return false;
514 }
515
516 std::string typedString;
517 // get the string in the jsonValue
518 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
519 {
520 Json::Value key = iter.key();
521 Json::Value value = (*iter);
522
523 if (key == Json::nullValue || value == Json::nullValue) {
524 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
525 return false;
526 }
527
528 // cannot convert to string
529 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
530 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
531 return false;
532 }
533
534 if (key.asString().compare("?") == 0) {
535 typedString.assign(value.asString());
536 // since the front end triggers the autocompletion when users typed '/',
537 // there must be a '/' at the end, and the first char must be '/'
538 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
539 return false;
540 break;
541 }
542 }
543
544 // 1. get the expected column number by parsing the typedString, so we can get the filed name
545 size_t pos = 0;
546 size_t start = 1; // start from the 1st char which is not '/'
547 size_t count = 0; // also the name to query for
548 std::string token;
549 std::string delimiter = "/";
550 std::map<std::string, std::string> typedComponents;
551 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
552 token = typedString.substr(start, pos - start);
553 if (count >= m_nameFields.size() - 1) {
554 return false;
555 }
556
557 // add column name and value (token) into map
558 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
559 count ++;
560 start = pos + 1;
561 }
562
563 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
564 // return true
565 bool more = false;
566 sqlQuery << "SELECT " << m_nameFields[count] << " FROM cmip5";
567 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
568 it != typedComponents.end(); ++it) {
569 if (more)
570 sqlQuery << " AND";
571 else
572 sqlQuery << " WHERE";
573
574 sqlQuery << " " << it->first << "='" << it->second << "'";
575
576 more = true;
577 }
578 sqlQuery << ";";
579 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600580}
581
582template <typename DatabaseHandler>
583void
584QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600585{
Alison Craig1aced7d2015-04-10 12:00:02 -0600586 // 1) Strip the prefix off the ndn::Interest's ndn::Name
587 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600588
Chengyu Fanb25835b2015-04-28 17:09:35 -0600589 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
590 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
591 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600592
Chengyu Fanb25835b2015-04-28 17:09:35 -0600593 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600594 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600595 return;
596 }
Chengyu Fan92440162015-07-09 14:43:31 -0600597 // check if the ACK is cached, if yes, respond with ACK
598 // ?? what if the results for now it NULL, but latter exist?
Alison Craig2a4d5282015-04-10 12:00:02 -0600599 // For efficiency, do a double check. Once without the lock, then with it.
600 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
601 m_mutex.lock();
602 { // !!! BEGIN CRITICAL SECTION !!!
603 // If this fails upon locking, we removed it during our search.
604 // An unusual race-condition case, which requires things like PIT aggregation to be off.
605 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
606 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600607 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600608 m_mutex.unlock(); //escape lock
609 return;
610 }
611 } // !!! END CRITICAL SECTION !!!
612 m_mutex.unlock();
613 }
614
615 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
616 Json::Value parsedFromString;
617 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600618 if (!reader.parse(jsonQuery, parsedFromString)) {
619 // @todo: send NACK?
620 std::cout << "cannot parse the JsonQuery" << std::endl;
621 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600622 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600623
Chengyu Fan92440162015-07-09 14:43:31 -0600624 // the version should be replaced with ChronoSync state digest
Chengyu Fanb25835b2015-04-28 17:09:35 -0600625 const ndn::name::Component version
626 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
627 ndn::time::system_clock::now()).count());
628
629 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
630
631 m_mutex.lock();
632 { // !!! BEGIN CRITICAL SECTION !!!
633 // An unusual race-condition case, which requires things like PIT aggregation to be off.
634 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
635 if (iter != m_activeQueryToFirstResponse.end()) {
636 m_face->put(*(iter->second));
637 m_mutex.unlock(); // escape lock
638 return;
639 }
640 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -0600641 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
642 // that conatin the old version when ChronoSync is updated
Chengyu Fanb25835b2015-04-28 17:09:35 -0600643 m_activeQueryToFirstResponse.insert(std::pair<std::string,
644 std::shared_ptr<ndn::Data>>(jsonQuery, ack));
645 m_face->put(*ack);
646 } // !!! END CRITICAL SECTION !!!
647 m_mutex.unlock();
648
649 // 3) Convert the JSON Query into a MySQL one
650 bool autocomplete = false;
651 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600652
Chengyu Fan92440162015-07-09 14:43:31 -0600653 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
654 // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
Chengyu Fanb25835b2015-04-28 17:09:35 -0600655 ndn::Name segmentPrefix(m_prefix);
656 segmentPrefix.append("query-results");
Chengyu Fan92440162015-07-09 14:43:31 -0600657 segmentPrefix.append(jsonStr);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600658 segmentPrefix.append(version);
Chengyu Fan92440162015-07-09 14:43:31 -0600659 segmentPrefix.append(m_catalogId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600660
Chengyu Fan92440162015-07-09 14:43:31 -0600661 Json::Value tmp;
662 // expect the autocomplete and the component-based query are separate
663 // if JSON::Value contains ? as key, is autocompletion
664 if (parsedFromString.get("?", tmp) != tmp) {
665 autocomplete = true;
666 if (!json2AutocompletionSql(sqlQuery, parsedFromString)) {
667 sendNack(segmentPrefix);
668 return;
669 }
670 }
671 else {
672 if (!json2Sql(sqlQuery, parsedFromString)) {
673 sendNack(segmentPrefix);
674 return;
675 }
676 }
677
678 // 4) Run the Query
Chengyu Fanb25835b2015-04-28 17:09:35 -0600679 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
680}
681
682template <typename DatabaseHandler>
683void
684QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
685 const std::string& sqlString,
686 bool autocomplete)
687{
688 // empty
689}
690
691// prepareSegments specilization function
692template<>
693void
694QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
695 const std::string& sqlString,
696 bool autocomplete)
697{
Chengyu Fan92440162015-07-09 14:43:31 -0600698 std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
699
Chengyu Fanb25835b2015-04-28 17:09:35 -0600700 // 4) Run the Query
701 std::shared_ptr<MYSQL_RES> results
702 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
703
704 if (!results) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600705 std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -0600706
Chengyu Fanb25835b2015-04-28 17:09:35 -0600707 // @todo: throw runtime error or log the error message?
708 return;
709 }
710
Chengyu Fan92440162015-07-09 14:43:31 -0600711 uint64_t resultCount = mysql_num_rows(results.get());
712
Chengyu Fanb25835b2015-04-28 17:09:35 -0600713 std::cout << "Query results for \""
714 << sqlString
715 << "\" contain "
Chengyu Fan92440162015-07-09 14:43:31 -0600716 << resultCount
Chengyu Fanb25835b2015-04-28 17:09:35 -0600717 << " rows" << std::endl;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600718
719 MYSQL_ROW row;
720 size_t usedBytes = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600721 uint64_t segmentNo = 0;
722 Json::Value array;
723 while ((row = mysql_fetch_row(results.get())))
724 {
725 size_t size = strlen(row[0]) + 1;
726 if (usedBytes + size > PAYLOAD_LIMIT) {
727 std::shared_ptr<ndn::Data> data
Chengyu Fan92440162015-07-09 14:43:31 -0600728 = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete, resultCount);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600729 m_mutex.lock();
730 m_cache.insert(*data);
731 m_mutex.unlock();
732 array.clear();
733 usedBytes = 0;
734 segmentNo++;
735 }
736 array.append(row[0]);
737 usedBytes += size;
738 }
739 std::shared_ptr<ndn::Data> data
Chengyu Fan92440162015-07-09 14:43:31 -0600740 = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete, resultCount);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600741 m_mutex.lock();
742 m_cache.insert(*data);
743 m_mutex.unlock();
744}
745
746template <typename DatabaseHandler>
747std::shared_ptr<ndn::Data>
748QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
749 const Json::Value& value,
750 uint64_t segmentNo,
751 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600752 bool isAutocomplete,
753 uint64_t resultCount)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600754{
755 Json::Value entry;
756 Json::FastWriter fastWriter;
Chengyu Fan92440162015-07-09 14:43:31 -0600757 Json::UInt64 count(resultCount);
758 entry["resultCount"] = count;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600759 if (isAutocomplete) {
760 entry["next"] = value;
761 } else {
762 entry["results"] = value;
763 }
764 const std::string jsonMessage = fastWriter.write(entry);
765 const char* payload = jsonMessage.c_str();
766 size_t payloadLength = jsonMessage.size() + 1;
767 ndn::Name segmentName(segmentPrefix);
768 segmentName.appendSegment(segmentNo);
769
770 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
771 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
772 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
773
774 if (isFinalBlock) {
775 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
776 }
777#ifndef NDEBUG
778 std::cout << "makeReplyData : " << segmentName << std::endl;
779#endif
780 signData(*data);
781 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -0600782}
783
784} // namespace query
785} // namespace atmos
786#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP