blob: ee60c34aa31b78ac4f1d6e656a2911e18ce819bd [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Chengyu Fancfb80c72015-10-19 16:50:04 -060042#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060043
44#include "mysql/mysql.h"
45
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060047#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060048#include <memory>
49#include <mutex>
50#include <sstream>
51#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060052#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060053#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060054
Chengyu Fan71b712b2015-09-09 22:13:56 -060055#include "util/logger.hpp"
56
57
Alison Craig2a4d5282015-04-10 12:00:02 -060058namespace atmos {
59namespace query {
Chengyu Fan71b712b2015-09-09 22:13:56 -060060#ifdef HAVE_LOG4CXX
61 INIT_LOGGER("QueryAdapter");
62#endif
63
Chengyu Fan92440162015-07-09 14:43:31 -060064// todo: calculate payload limit by get the size of a signed empty Data packet
65static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060066
67/**
68 * QueryAdapter handles the Query usecases for the catalog
69 */
70template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060071class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060072public:
73 /**
74 * Constructor
75 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060076 * @param face: Face that will be used for NDN communications
77 * @param keyChain: KeyChain that will be used for data signing
78 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060079 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060080 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060081 const std::shared_ptr<ndn::KeyChain>& keyChain,
82 const std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084 virtual
85 ~QueryAdapter();
86
87 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060088 * Helper function to specify section handler
89 */
90 void
91 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060092 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060093 const std::vector<std::string>& nameFields,
94 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060095
96protected:
97 /**
98 * Helper function for configuration parsing
99 */
100 void
101 onConfig(const util::ConfigSection& section,
102 bool isDryDun,
103 const std::string& fileName,
104 const ndn::Name& prefix);
105
106 /**
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 * Handles incoming query requests by stripping the filter off the Interest to get the
108 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
109 *
110 * @param filter: InterestFilter that caused this Interest to be routed
111 * @param interest: Interest that needs to be handled
112 */
113 virtual void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700114 onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600115
Alison Craig2a4d5282015-04-10 12:00:02 -0600116 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600117 * Handles requests for responses to an filter initialization request
118 *
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600119 * @param interest: Interest that needs to be handled
120 */
121 virtual void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700122 onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600123
124 /**
125 * Helper function that generates query results from a Json query carried in the Interest
126 *
127 * @param interest: Interest that needs to be handled
128 */
129 void
130 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
131
132 void
133 getFiltersMenu(Json::Value& value);
134
135 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600136 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600137 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600138 * @param segmentPrefix: Name that identifies the Prefix for the Data
139 * @param value: Json::Value to be sent in the Data
140 * @param segmentNo: uint64_t the segment for this Data
141 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
142 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600143 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600144 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 * @param viewStart: the start index of the record in the query results payload
146 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600147 * @param lastComponent: flag to indicate the content contains the last component for
148 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600149 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600150 std::shared_ptr<ndn::Data>
151 makeReplyData(const ndn::Name& segmentPrefix,
152 const Json::Value& value,
153 uint64_t segmentNo,
154 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600155 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600156 uint64_t resultCount,
157 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600158 uint64_t viewEnd,
159 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600160
161 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600162 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600163 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600164 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600165 */
166 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600167 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600168
Alison Craig1aced7d2015-04-10 12:00:02 -0600169 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600170 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600171 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600172 * @param interest: Intersts that needs to be handled
173 * @param version: Version that needs to be in the data name
174 */
175 std::shared_ptr<ndn::Data>
176 makeAckData(std::shared_ptr<const ndn::Interest> interest,
177 const ndn::Name::Component& version);
178
179 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600180 * Helper function that sends NACK
181 *
182 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600183 */
184 void
Chengyu Fan92440162015-07-09 14:43:31 -0600185 sendNack(const ndn::Name& dataPrefix);
186
187 /**
188 * Helper function that generates the sqlQuery string for component-based query
189 * @param sqlQuery: stringstream to save the sqlQuery string
190 * @param jsonValue: Json value that contains the query information
191 */
192 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600193 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600194 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600195
196 /**
197 * Helper function that signs the data
198 */
199 void
200 signData(ndn::Data& data);
201
202 /**
203 * Helper function that publishes query-results data segments
204 */
205 virtual void
206 prepareSegments(const ndn::Name& segmentPrefix,
207 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600208 bool autocomplete,
209 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600210
211 /**
212 * Helper function to set the DatabaseHandler
213 */
214 void
215 setDatabaseHandler(const util::ConnectionDetails& databaseId);
216
217 /**
218 * Helper function that set filters to make the adapter work
219 */
220 void
221 setFilters();
222
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600223 void
224 setCatalogId();
225
Chengyu Fan92440162015-07-09 14:43:31 -0600226 /**
227 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600228 * @param sqlQuery: stringstream to save the sqlQuery string
229 * @param jsonValue: Json value that contains the query information
230 * @param lastComponent: Flag to mark the last component query
Chengyu Fan92440162015-07-09 14:43:31 -0600231 */
232 bool
233 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600234 Json::Value& jsonValue,
235 bool& lastComponent);
Chengyu Fan92440162015-07-09 14:43:31 -0600236
Chengyu Fan46398212015-08-11 11:23:13 -0600237 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600238 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
239 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600240
241 ndn::Name
242 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
243 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600244
Chengyu Fancfb80c72015-10-19 16:50:04 -0600245 std::string
246 getChronoSyncDigest();
247
Chengyu Fanb25835b2015-04-28 17:09:35 -0600248protected:
249 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
250 // Handle to the Catalog's database
251 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600252 const std::shared_ptr<chronosync::Socket>& m_socket;
Alison Craig1aced7d2015-04-10 12:00:02 -0600253
Alison Craig2a4d5282015-04-10 12:00:02 -0600254 // mutex to control critical sections
255 std::mutex m_mutex;
256 // @{ needs m_mutex protection
257 // The Queries we are currently writing to
Chengyu Fancfb80c72015-10-19 16:50:04 -0600258 //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
259 ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600260 ndn::util::InMemoryStorageLru m_cache;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600261 std::string m_chronosyncDigest;
Alison Craig2a4d5282015-04-10 12:00:02 -0600262 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600263 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600264 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600265 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600266};
267
Alison Craig2a4d5282015-04-10 12:00:02 -0600268template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600269QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600270 const std::shared_ptr<ndn::KeyChain>& keyChain,
271 const std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600272 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600273 , m_socket(syncSocket)
274 , m_activeQueryToFirstResponse(100000)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600275 , m_cache(250000)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600276 , m_chronosyncDigest("0")
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600277 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600278{
Alison Craig2a4d5282015-04-10 12:00:02 -0600279}
280
281template <typename DatabaseHandler>
282void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600283QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600284{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700285 m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
286 bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600287 this, _1, _2),
288 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
289 this, _1),
290 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
291 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600292}
293
294template <typename DatabaseHandler>
295void
296QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600297 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600298 const std::vector<std::string>& nameFields,
299 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600300{
Chengyu Fan92440162015-07-09 14:43:31 -0600301 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600302 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600303 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
304 _1, _2, _3, prefix));
305}
306
307template <typename DatabaseHandler>
308void
309QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
310 bool isDryRun,
311 const std::string& filename,
312 const ndn::Name& prefix)
313{
314 using namespace util;
315 if (isDryRun) {
316 return;
317 }
318 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
319 for (auto item = section.begin();
320 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600321 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600322 {
323 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600324 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600325 if (signingId.empty()) {
326 throw Error("Empty value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600327 " in \"query\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600328 }
329 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600330 if (item->first == "filterCategoryNames") {
331 std::istringstream ss(item->second.get_value<std::string>());
332 std::string token;
333 while(std::getline(ss, token, ',')) {
334 m_filterCategoryNames.push_back(token);
335 }
336 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600337 if (item->first == "database") {
338 const util::ConfigSection& dataSection = item->second;
339 for (auto subItem = dataSection.begin();
340 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600341 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600342 {
343 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600344 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600345 }
346 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600347 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600348 }
349 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600350 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600351 }
352 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600353 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600354 }
355 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600356
357 if (dbServer.empty()){
358 throw Error("Invalid value for \"dbServer\""
359 " in \"query\" section");
360 }
361 if (dbName.empty()){
362 throw Error("Invalid value for \"dbName\""
363 " in \"query\" section");
364 }
365 if (dbUser.empty()){
366 throw Error("Invalid value for \"dbUser\""
367 " in \"query\" section");
368 }
369 if (dbPasswd.empty()){
370 throw Error("Invalid value for \"dbPasswd\""
371 " in \"query\" section");
372 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373 }
374 }
375
Chengyu Fancfb80c72015-10-19 16:50:04 -0600376 if (m_filterCategoryNames.empty()) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600377 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
378 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600379
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600380 m_prefix = prefix;
381
382 m_signingId = ndn::Name(signingId);
383 setCatalogId();
384
385 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600386 setDatabaseHandler(mysqlId);
387 setFilters();
388}
389
390template <typename DatabaseHandler>
391void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600392QueryAdapter<DatabaseHandler>::setCatalogId()
393{
394 //empty
395}
396
397template <>
398void
399QueryAdapter<MYSQL>::setCatalogId()
400{
401 // use public key digest as the catalog ID
402 ndn::Name keyId;
403 if (m_signingId.empty()) {
404 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
405 } else {
406 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
407 }
408
409 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
410 ndn::Block keyDigest = pKey->computeDigest();
411 m_catalogId.clear();
412 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
413}
414
415template <typename DatabaseHandler>
416void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600417QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
418{
419 //empty
420}
421
422template <>
423void
424QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
425{
426 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
427
428 m_databaseHandler = conn;
429}
430
431template <typename DatabaseHandler>
432QueryAdapter<DatabaseHandler>::~QueryAdapter()
433{
434 for (const auto& itr : m_registeredPrefixList) {
435 if (static_cast<bool>(itr.second))
436 m_face->unsetInterestFilter(itr.second);
437 }
438}
439
440template <typename DatabaseHandler>
441void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700442QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
443 const ndn::Interest& interest)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600444{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700445 _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");
Chengyu Fan71b712b2015-09-09 22:13:56 -0600446
Chengyu Fan7b978f82015-12-09 17:03:23 -0700447 // Interest must carry component "initialization" or "query"
448 if (interest.getName().size() < filter.getPrefix().size()) {
Alison Craig2a4d5282015-04-10 12:00:02 -0600449 // @todo: return a nack
450 return;
451 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600452
Chengyu Fan7b978f82015-12-09 17:03:23 -0700453 _LOG_DEBUG("Interest : " << interest.getName());
Alison Craig2a4d5282015-04-10 12:00:02 -0600454 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600455
Chengyu Fan7b978f82015-12-09 17:03:23 -0700456 if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
457 std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
458 this,
459 interestPtr);
460 queryThread.join();
461 }
462 else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
Chengyu Fan92440162015-07-09 14:43:31 -0600463
Chengyu Fan7b978f82015-12-09 17:03:23 -0700464 auto data = m_cache.find(interest);
465 if (data) {
466 m_face->put(*data);
467 return;
468 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600469
Chengyu Fan7b978f82015-12-09 17:03:23 -0700470 // catalog must strip sequence number in an Interest for further process
471 if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
472 // Interest carries sequence number, only grip the main part
473 // e.g., /hep/query/<query-params>/<version>/#seq
474 ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));
Chengyu Fan92440162015-07-09 14:43:31 -0600475
Chengyu Fan7b978f82015-12-09 17:03:23 -0700476 auto data = m_cache.find(queryInterest);
477 if (data) {
478 // catalog has generated some data, but still working on it
479 return;
480 }
481 interestPtr = queryInterest.shared_from_this();
482 }
Chengyu Fan92440162015-07-09 14:43:31 -0600483
Chengyu Fan7b978f82015-12-09 17:03:23 -0700484 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
485 this,
486 interestPtr);
487 queryThread.join();
Alison Craig1aced7d2015-04-10 12:00:02 -0600488 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600489
Chengyu Fan7b978f82015-12-09 17:03:23 -0700490 // ignore other Interests
Alison Craig2a4d5282015-04-10 12:00:02 -0600491}
492
493template <typename DatabaseHandler>
494void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700495QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600496{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600497 _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
Chengyu Fan71b712b2015-09-09 22:13:56 -0600498
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700499 if(m_socket != nullptr) {
500 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
501 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
502 _LOG_DEBUG("Original digest :" << m_chronosyncDigest);
503 _LOG_DEBUG("New digest : " << digestStr);
504 // if the m_chronosyncDigest and the rootdigest are not equal
505 if (digestStr != m_chronosyncDigest) {
506 // (1) update chronosyncDigest
507 // (2) clear all staled ACK data
508 m_mutex.lock();
509 m_chronosyncDigest = digestStr;
510 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
511 m_mutex.unlock();
512 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
513 }
514 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600515
Chengyu Fan7b978f82015-12-09 17:03:23 -0700516 auto data = m_activeQueryToFirstResponse.find(*interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600517 if (data) {
518 m_face->put(*data);
519 }
520 else {
Chengyu Fan7b978f82015-12-09 17:03:23 -0700521 populateFiltersMenu(interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600522 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523
524 _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600525}
526
527template <typename DatabaseHandler>
528void
529QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
530{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600531 _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600532 Json::Value filters;
533 Json::FastWriter fastWriter;
534 getFiltersMenu(filters);
535
536 const std::string filterValue = fastWriter.write(filters);
537
538 if (!filters.empty()) {
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700539 // use /<prefix>/filters-initialization/<seg> as data name
540 ndn::Name filterDataName(interest->getName().getPrefix(-1));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600541
542 const char* payload = filterValue.c_str();
543 size_t payloadLength = filterValue.size();
544 size_t startIndex = 0, seqNo = 0;
545
546 if (filterValue.length() > PAYLOAD_LIMIT) {
547 payloadLength = PAYLOAD_LIMIT;
548 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
549 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700550 // freshnessPeriod 0 means permanent?
551 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600552 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
553
554 signData(*filterData);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600555
556 _LOG_DEBUG("Populate Filter Data :" << segmentName);
557
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600558 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700559 // save the filter results in the activeQueryToFirstResponse structure
560 // when version changes, the activeQueryToFirstResponse should be cleaned
561 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600562 try {
563 m_face->put(*filterData);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600564 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600565 catch (std::exception& e) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600566 _LOG_ERROR(e.what());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600567 }
568 m_mutex.unlock();
569
570 seqNo++;
571 startIndex = payloadLength * seqNo + 1;
572 }
573 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
574
575 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
576 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700577 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600578 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
579 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
580
581 signData(*filterData);
582 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700583 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600584 m_face->put(*filterData);
585 m_mutex.unlock();
586 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600587 _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600588}
589
590template <typename DatabaseHandler>
591void
592QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
593{
594 // empty
595}
596
597// get distinct value of each column
598template <>
599void
600QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
601{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600602 _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600603 Json::Value tmp;
604
605 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
606 std::string columnName = m_filterCategoryNames[i];
607 std::string getFilterSql("SELECT DISTINCT " + columnName +
608 " FROM " + m_databaseTable + ";");
609 std::string errMsg;
610 bool success;
611
612 std::shared_ptr<MYSQL_RES> results
613 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
614 util::QUERY, success, errMsg);
615 if (!success) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600616 _LOG_ERROR(errMsg);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600617 value.clear();
618 return;
619 }
620
621 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
622 {
623 tmp[columnName].append(row[0]);
624 }
625 value.append(tmp);
626 tmp.clear();
627 }
628
Chengyu Fan71b712b2015-09-09 22:13:56 -0600629 _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600630}
631
632template <typename DatabaseHandler>
633void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600634QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600635{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600636 if (m_signingId.empty())
637 m_keyChain->sign(data);
638 else {
639 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
640 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
641 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600642 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600643}
644
645template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600646ndn::Name
647QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
648 const ndn::Name::Component& version)
649{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700650 // use generic name, instead of specific one
651 ndn::Name queryResultName = interest->getName();
652 queryResultName.append(version);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600653 return queryResultName;
654}
655
656template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600657std::shared_ptr<ndn::Data>
658QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
659 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600660{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600661 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600662
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600663 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
664 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
665 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600666 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
667
Chengyu Fanb25835b2015-04-28 17:09:35 -0600668 signData(*ack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600669
670 _LOG_DEBUG("Make ACK : " << queryResultNameStr);
671
Chengyu Fanb25835b2015-04-28 17:09:35 -0600672 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600673}
674
675template <typename DatabaseHandler>
676void
Chengyu Fan92440162015-07-09 14:43:31 -0600677QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600678{
Chengyu Fan92440162015-07-09 14:43:31 -0600679 uint64_t segmentNo = 0;
680
681 std::shared_ptr<ndn::Data> nack =
682 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
683 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
684 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
685
686 signData(*nack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600687
688 _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
689
Chengyu Fan92440162015-07-09 14:43:31 -0600690 m_mutex.lock();
691 m_cache.insert(*nack);
692 m_mutex.unlock();
693}
694
695
696template <typename DatabaseHandler>
697bool
698QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
699 Json::Value& jsonValue)
700{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600701 _LOG_DEBUG(">> QueryAdapter::json2Sql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600702
Chengyu Fan71b712b2015-09-09 22:13:56 -0600703 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600704
Chengyu Fan92440162015-07-09 14:43:31 -0600705 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600706 return false;
707 }
708
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600709 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600710 bool input = false;
711 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
712 {
713 Json::Value key = iter.key();
714 Json::Value value = (*iter);
715
Chengyu Fan92440162015-07-09 14:43:31 -0600716 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600717 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600718 return false;
719 }
720
721 // cannot convert to string
722 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600723 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600724 return false;
725 }
726
727 if (key.asString().compare("?") == 0) {
728 continue;
729 }
730
Chengyu Fanb25835b2015-04-28 17:09:35 -0600731 if (input) {
732 sqlQuery << " AND";
733 } else {
734 sqlQuery << " WHERE";
735 }
736
Chengyu Fan92440162015-07-09 14:43:31 -0600737 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600738 input = true;
739 }
740
741 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600742 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600743 }
744 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600745 return true;
746}
747
748template <typename DatabaseHandler>
749bool
750QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600751 Json::Value& jsonValue,
752 bool& lastComponent)
Chengyu Fan92440162015-07-09 14:43:31 -0600753{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600754 _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600755
Chengyu Fan71b712b2015-09-09 22:13:56 -0600756 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600757
Chengyu Fan92440162015-07-09 14:43:31 -0600758 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600759 return false;
760 }
761
762 std::string typedString;
763 // get the string in the jsonValue
764 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
765 {
766 Json::Value key = iter.key();
767 Json::Value value = (*iter);
768
769 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600770 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600771 return false;
772 }
773
774 // cannot convert to string
775 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600776 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600777 return false;
778 }
779
780 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600781 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600782 // since the front end triggers the autocompletion when users typed '/',
783 // there must be a '/' at the end, and the first char must be '/'
784 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
785 return false;
786 break;
787 }
788 }
789
790 // 1. get the expected column number by parsing the typedString, so we can get the filed name
791 size_t pos = 0;
792 size_t start = 1; // start from the 1st char which is not '/'
793 size_t count = 0; // also the name to query for
794 std::string token;
795 std::string delimiter = "/";
796 std::map<std::string, std::string> typedComponents;
797 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
798 token = typedString.substr(start, pos - start);
799 if (count >= m_nameFields.size() - 1) {
800 return false;
801 }
802
803 // add column name and value (token) into map
804 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600805 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600806 start = pos + 1;
807 }
808
809 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
810 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600811 if (count == m_nameFields.size() - 1)
812 lastComponent = true; // indicate this query is to query the last component
813
Chengyu Fan92440162015-07-09 14:43:31 -0600814 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600815 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600816 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
817 it != typedComponents.end(); ++it) {
818 if (more)
819 sqlQuery << " AND";
820 else
821 sqlQuery << " WHERE";
822
823 sqlQuery << " " << it->first << "='" << it->second << "'";
824
825 more = true;
826 }
827 sqlQuery << ";";
828 return true;
829}
830
831template <typename DatabaseHandler>
832bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600833QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
834 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600835{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600836 _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600837
Chengyu Fan71b712b2015-09-09 22:13:56 -0600838 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600839
Chengyu Fan46398212015-08-11 11:23:13 -0600840 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600841 return false;
842 }
843
844 std::string typedString;
845 // get the string in the jsonValue
846 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
847 {
848 Json::Value key = iter.key();
849 Json::Value value = (*iter);
850
851 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600852 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan46398212015-08-11 11:23:13 -0600853 return false;
854 }
855
856 // cannot convert to string
857 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600858 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600859 return false;
860 }
861
862 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600863 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600864 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600865 return false;
866 break;
867 }
868 }
869
870 // 1. get the expected column number by parsing the typedString, so we can get the filed name
871 size_t pos = 0;
872 size_t start = 1; // start from the 1st char which is not '/'
873 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600874 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600875 std::string token;
876 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600877 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600878 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
879 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600880 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600881 return false;
882 }
883
884 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600885 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
886
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600887 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600888 start = pos + 1;
889 }
890
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600891 // we may have a component after the last "/"
892 if (start < typedStringLen) {
893 typedComponents.push_back(std::make_pair(m_nameFields[count],
894 typedString.substr(start, typedStringLen - start)));
895 }
896
Chengyu Fan46398212015-08-11 11:23:13 -0600897 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
898 // return true
899 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600900 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600901 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600902 it != typedComponents.end(); ++it) {
903 if (more)
904 sqlQuery << " AND";
905 else
906 sqlQuery << " WHERE";
907
908 sqlQuery << " " << it->first << "='" << it->second << "'";
909
910 more = true;
911 }
912 sqlQuery << ";";
913 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600914}
915
916template <typename DatabaseHandler>
917void
918QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600919{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600920 _LOG_DEBUG(">> QueryAdapter::runJsonQuery");
921
Alison Craig1aced7d2015-04-10 12:00:02 -0600922 // 1) Strip the prefix off the ndn::Interest's ndn::Name
923 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600924
Chengyu Fanb25835b2015-04-28 17:09:35 -0600925 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
926 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
927 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600928
Chengyu Fanb25835b2015-04-28 17:09:35 -0600929 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600930 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600931 return;
932 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600933
934 // the version should be replaced with ChronoSync state digest
935 ndn::name::Component version;
936
937 if(m_socket != nullptr) {
938 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
939 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700940 _LOG_DEBUG("Original digest " << m_chronosyncDigest);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600941 _LOG_DEBUG("New digest : " << digestStr);
942 // if the m_chronosyncDigest and the rootdigest are not equal
943 if (digestStr != m_chronosyncDigest) {
944 // (1) update chronosyncDigest
945 // (2) clear all staled ACK data
946 m_mutex.lock();
947 m_chronosyncDigest = digestStr;
948 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
949 m_mutex.unlock();
950 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
951 }
952 version = ndn::name::Component::fromEscapedString(digestStr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600953 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600954 else {
955 version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
956 }
957
Alison Craig2a4d5282015-04-10 12:00:02 -0600958 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
959 Json::Value parsedFromString;
960 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600961 if (!reader.parse(jsonQuery, parsedFromString)) {
962 // @todo: send NACK?
Chengyu Fancfb80c72015-10-19 16:50:04 -0600963 _LOG_ERROR("Cannot parse the JsonQuery");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600964 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600965 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600966
Chengyu Fanb25835b2015-04-28 17:09:35 -0600967 // 3) Convert the JSON Query into a MySQL one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600968 bool autocomplete = false, lastComponent = false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600969 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600970
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600971 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fan7b978f82015-12-09 17:03:23 -0700972 _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600973
Chengyu Fan92440162015-07-09 14:43:31 -0600974 Json::Value tmp;
975 // expect the autocomplete and the component-based query are separate
976 // if JSON::Value contains ? as key, is autocompletion
977 if (parsedFromString.get("?", tmp) != tmp) {
978 autocomplete = true;
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600979 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
Chengyu Fan92440162015-07-09 14:43:31 -0600980 sendNack(segmentPrefix);
981 return;
982 }
983 }
Chengyu Fan46398212015-08-11 11:23:13 -0600984 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600985 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600986 sendNack(segmentPrefix);
987 return;
988 }
989 }
Chengyu Fan92440162015-07-09 14:43:31 -0600990 else {
991 if (!json2Sql(sqlQuery, parsedFromString)) {
992 sendNack(segmentPrefix);
993 return;
994 }
995 }
996
997 // 4) Run the Query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600998 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600999}
1000
1001template <typename DatabaseHandler>
1002void
1003QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1004 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001005 bool autocomplete,
1006 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001007{
1008 // empty
1009}
1010
1011// prepareSegments specilization function
1012template<>
1013void
1014QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1015 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001016 bool autocomplete,
1017 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001018{
Chengyu Fan71b712b2015-09-09 22:13:56 -06001019 _LOG_DEBUG(">> QueryAdapter::prepareSegments");
Chengyu Fancfb80c72015-10-19 16:50:04 -06001020
Chengyu Fan71b712b2015-09-09 22:13:56 -06001021 _LOG_DEBUG(sqlString);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001022
Chengyu Fan46398212015-08-11 11:23:13 -06001023 std::string errMsg;
1024 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001025 // 4) Run the Query
1026 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001027 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1028 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -06001029 _LOG_ERROR(errMsg);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001030
1031 if (!results) {
Chengyu Fancfb80c72015-10-19 16:50:04 -06001032 _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
Chengyu Fan92440162015-07-09 14:43:31 -06001033
Chengyu Fanb25835b2015-04-28 17:09:35 -06001034 // @todo: throw runtime error or log the error message?
1035 return;
1036 }
1037
Chengyu Fan92440162015-07-09 14:43:31 -06001038 uint64_t resultCount = mysql_num_rows(results.get());
1039
Chengyu Fan71b712b2015-09-09 22:13:56 -06001040 _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
Chengyu Fanb25835b2015-04-28 17:09:35 -06001041
1042 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001043 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001044 Json::Value tmp;
1045 Json::Value resultJson;
1046 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001047
1048 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001049 while ((row = mysql_fetch_row(results.get())))
1050 {
Chengyu Fan46398212015-08-11 11:23:13 -06001051 tmp.append(row[0]);
1052 const std::string tmpString = fastWriter.write(tmp);
1053 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001054 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001055 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001056 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001057 m_mutex.lock();
1058 m_cache.insert(*data);
1059 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001060 tmp.clear();
1061 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001062 segmentNo++;
1063 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001064 }
Chengyu Fan46398212015-08-11 11:23:13 -06001065 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001066 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001067 }
Chengyu Fan46398212015-08-11 11:23:13 -06001068
Chengyu Fanb25835b2015-04-28 17:09:35 -06001069 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001070 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001071 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001072 m_mutex.lock();
1073 m_cache.insert(*data);
1074 m_mutex.unlock();
1075}
1076
1077template <typename DatabaseHandler>
1078std::shared_ptr<ndn::Data>
1079QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1080 const Json::Value& value,
1081 uint64_t segmentNo,
1082 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001083 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001084 uint64_t resultCount,
1085 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001086 uint64_t viewEnd,
1087 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001088{
1089 Json::Value entry;
1090 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001091
1092 entry["resultCount"] = Json::UInt64(resultCount);;
1093 entry["viewStart"] = Json::UInt64(viewStart);
1094 entry["viewEnd"] = Json::UInt64(viewEnd);
1095
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001096 if (lastComponent)
1097 entry["lastComponent"] = Json::Value(true);
1098
Chengyu Fan71b712b2015-09-09 22:13:56 -06001099 _LOG_DEBUG("resultCount " << resultCount << ";"
1100 << "viewStart " << viewStart << ";"
1101 << "viewEnd " << viewEnd);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001102
Chengyu Fanb25835b2015-04-28 17:09:35 -06001103 if (isAutocomplete) {
1104 entry["next"] = value;
1105 } else {
1106 entry["results"] = value;
1107 }
1108 const std::string jsonMessage = fastWriter.write(entry);
1109 const char* payload = jsonMessage.c_str();
1110 size_t payloadLength = jsonMessage.size() + 1;
1111 ndn::Name segmentName(segmentPrefix);
1112 segmentName.appendSegment(segmentNo);
1113
1114 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1115 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1116 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1117
1118 if (isFinalBlock) {
1119 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1120 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001121
Chengyu Fan71b712b2015-09-09 22:13:56 -06001122 _LOG_DEBUG(segmentName);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001123
Chengyu Fanb25835b2015-04-28 17:09:35 -06001124 signData(*data);
1125 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001126}
1127
Chengyu Fancfb80c72015-10-19 16:50:04 -06001128
Alison Craig2a4d5282015-04-10 12:00:02 -06001129} // namespace query
1130} // namespace atmos
1131#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP