blob: c0f292ce38010a377ee39ce0c3e7f3dd5f758f6f [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Chengyu Fancfb80c72015-10-19 16:50:04 -060042#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060043
44#include "mysql/mysql.h"
45
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060047#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060048#include <memory>
49#include <mutex>
50#include <sstream>
51#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060052#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060053#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060054
Chengyu Fan71b712b2015-09-09 22:13:56 -060055#include "util/logger.hpp"
56
57
Alison Craig2a4d5282015-04-10 12:00:02 -060058namespace atmos {
59namespace query {
Chengyu Fan71b712b2015-09-09 22:13:56 -060060#ifdef HAVE_LOG4CXX
61 INIT_LOGGER("QueryAdapter");
62#endif
63
Chengyu Fan92440162015-07-09 14:43:31 -060064// todo: calculate payload limit by get the size of a signed empty Data packet
65static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060066
67/**
68 * QueryAdapter handles the Query usecases for the catalog
69 */
70template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060071class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060072public:
73 /**
74 * Constructor
75 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060076 * @param face: Face that will be used for NDN communications
77 * @param keyChain: KeyChain that will be used for data signing
78 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060079 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060080 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060081 const std::shared_ptr<ndn::KeyChain>& keyChain,
82 const std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084 virtual
85 ~QueryAdapter();
86
87 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060088 * Helper function to specify section handler
89 */
90 void
91 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060092 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060093 const std::vector<std::string>& nameFields,
94 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060095
96protected:
97 /**
98 * Helper function for configuration parsing
99 */
100 void
101 onConfig(const util::ConfigSection& section,
102 bool isDryDun,
103 const std::string& fileName,
104 const ndn::Name& prefix);
105
106 /**
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 * Handles incoming query requests by stripping the filter off the Interest to get the
108 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
109 *
110 * @param filter: InterestFilter that caused this Interest to be routed
111 * @param interest: Interest that needs to be handled
112 */
113 virtual void
114 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
115
116 /**
117 * Handles requests for responses to an existing query
118 *
119 * @param filter: InterestFilter that caused this Interest to be routed
120 * @param interest: Interest that needs to be handled
121 */
122 virtual void
123 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
124
Alison Craig2a4d5282015-04-10 12:00:02 -0600125 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600126 * Handles requests for responses to an filter initialization request
127 *
128 * @param filter: InterestFilter that caused this Interest to be routed
129 * @param interest: Interest that needs to be handled
130 */
131 virtual void
132 onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
133
134 /**
135 * Helper function that generates query results from a Json query carried in the Interest
136 *
137 * @param interest: Interest that needs to be handled
138 */
139 void
140 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
141
142 void
143 getFiltersMenu(Json::Value& value);
144
145 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600146 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600147 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600148 * @param segmentPrefix: Name that identifies the Prefix for the Data
149 * @param value: Json::Value to be sent in the Data
150 * @param segmentNo: uint64_t the segment for this Data
151 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
152 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600153 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600154 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600155 * @param viewStart: the start index of the record in the query results payload
156 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600157 * @param lastComponent: flag to indicate the content contains the last component for
158 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600159 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600160 std::shared_ptr<ndn::Data>
161 makeReplyData(const ndn::Name& segmentPrefix,
162 const Json::Value& value,
163 uint64_t segmentNo,
164 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600165 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600166 uint64_t resultCount,
167 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600168 uint64_t viewEnd,
169 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600170
171 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600172 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600173 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600174 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600175 */
176 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600177 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600178
Alison Craig1aced7d2015-04-10 12:00:02 -0600179 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600180 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600181 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600182 * @param interest: Intersts that needs to be handled
183 * @param version: Version that needs to be in the data name
184 */
185 std::shared_ptr<ndn::Data>
186 makeAckData(std::shared_ptr<const ndn::Interest> interest,
187 const ndn::Name::Component& version);
188
189 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600190 * Helper function that sends NACK
191 *
192 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600193 */
194 void
Chengyu Fan92440162015-07-09 14:43:31 -0600195 sendNack(const ndn::Name& dataPrefix);
196
197 /**
198 * Helper function that generates the sqlQuery string for component-based query
199 * @param sqlQuery: stringstream to save the sqlQuery string
200 * @param jsonValue: Json value that contains the query information
201 */
202 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600203 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600204 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600205
206 /**
207 * Helper function that signs the data
208 */
209 void
210 signData(ndn::Data& data);
211
212 /**
213 * Helper function that publishes query-results data segments
214 */
215 virtual void
216 prepareSegments(const ndn::Name& segmentPrefix,
217 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600218 bool autocomplete,
219 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600220
221 /**
222 * Helper function to set the DatabaseHandler
223 */
224 void
225 setDatabaseHandler(const util::ConnectionDetails& databaseId);
226
227 /**
228 * Helper function that set filters to make the adapter work
229 */
230 void
231 setFilters();
232
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600233 void
234 setCatalogId();
235
Chengyu Fan92440162015-07-09 14:43:31 -0600236 /**
237 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600238 * @param sqlQuery: stringstream to save the sqlQuery string
239 * @param jsonValue: Json value that contains the query information
240 * @param lastComponent: Flag to mark the last component query
Chengyu Fan92440162015-07-09 14:43:31 -0600241 */
242 bool
243 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600244 Json::Value& jsonValue,
245 bool& lastComponent);
Chengyu Fan92440162015-07-09 14:43:31 -0600246
Chengyu Fan46398212015-08-11 11:23:13 -0600247 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600248 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
249 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600250
251 ndn::Name
252 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
253 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600254
Chengyu Fancfb80c72015-10-19 16:50:04 -0600255 std::string
256 getChronoSyncDigest();
257
Chengyu Fanb25835b2015-04-28 17:09:35 -0600258protected:
259 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
260 // Handle to the Catalog's database
261 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600262 const std::shared_ptr<chronosync::Socket>& m_socket;
Alison Craig1aced7d2015-04-10 12:00:02 -0600263
Alison Craig2a4d5282015-04-10 12:00:02 -0600264 // mutex to control critical sections
265 std::mutex m_mutex;
266 // @{ needs m_mutex protection
267 // The Queries we are currently writing to
Chengyu Fancfb80c72015-10-19 16:50:04 -0600268 //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
269 ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600270 ndn::util::InMemoryStorageLru m_cache;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600271 std::string m_chronosyncDigest;
Alison Craig2a4d5282015-04-10 12:00:02 -0600272 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600273 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600274 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600275 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600276};
277
Alison Craig2a4d5282015-04-10 12:00:02 -0600278template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600279QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600280 const std::shared_ptr<ndn::KeyChain>& keyChain,
281 const std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600282 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600283 , m_socket(syncSocket)
284 , m_activeQueryToFirstResponse(100000)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600285 , m_cache(250000)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600286 , m_chronosyncDigest("0")
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600287 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600288{
Alison Craig2a4d5282015-04-10 12:00:02 -0600289}
290
291template <typename DatabaseHandler>
292void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600293QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600294{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600295 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
296 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
297 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
298 this, _1, _2),
299 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
300 this, _1),
301 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
302 this, _1, _2));
303
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600304 ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
305 m_registeredPrefixList[queryResultsPrefix] =
306 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
307 .append("query-results").append(m_catalogId)),
Chengyu Fanb25835b2015-04-28 17:09:35 -0600308 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
309 this, _1, _2),
310 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
311 this, _1),
312 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
313 this, _1, _2));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600314
315 ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
316 m_registeredPrefixList[filtersInitializationPrefix] =
317 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
318 bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
319 this, _1, _2),
320 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
321 this, _1),
322 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
323 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600324}
325
326template <typename DatabaseHandler>
327void
328QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600329 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600330 const std::vector<std::string>& nameFields,
331 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600332{
Chengyu Fan92440162015-07-09 14:43:31 -0600333 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600334 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600335 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
336 _1, _2, _3, prefix));
337}
338
339template <typename DatabaseHandler>
340void
341QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
342 bool isDryRun,
343 const std::string& filename,
344 const ndn::Name& prefix)
345{
346 using namespace util;
347 if (isDryRun) {
348 return;
349 }
350 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
351 for (auto item = section.begin();
352 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600353 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600354 {
355 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600356 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600357 if (signingId.empty()) {
358 throw Error("Empty value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600359 " in \"query\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600360 }
361 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600362 if (item->first == "filterCategoryNames") {
363 std::istringstream ss(item->second.get_value<std::string>());
364 std::string token;
365 while(std::getline(ss, token, ',')) {
366 m_filterCategoryNames.push_back(token);
367 }
368 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600369 if (item->first == "database") {
370 const util::ConfigSection& dataSection = item->second;
371 for (auto subItem = dataSection.begin();
372 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600373 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600374 {
375 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600376 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600377 }
378 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600379 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600380 }
381 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600382 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600383 }
384 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600385 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600386 }
387 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600388
389 if (dbServer.empty()){
390 throw Error("Invalid value for \"dbServer\""
391 " in \"query\" section");
392 }
393 if (dbName.empty()){
394 throw Error("Invalid value for \"dbName\""
395 " in \"query\" section");
396 }
397 if (dbUser.empty()){
398 throw Error("Invalid value for \"dbUser\""
399 " in \"query\" section");
400 }
401 if (dbPasswd.empty()){
402 throw Error("Invalid value for \"dbPasswd\""
403 " in \"query\" section");
404 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600405 }
406 }
407
Chengyu Fancfb80c72015-10-19 16:50:04 -0600408 if (m_filterCategoryNames.empty()) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600409 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
410 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600411
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600412 m_prefix = prefix;
413
414 m_signingId = ndn::Name(signingId);
415 setCatalogId();
416
417 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600418 setDatabaseHandler(mysqlId);
419 setFilters();
420}
421
422template <typename DatabaseHandler>
423void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600424QueryAdapter<DatabaseHandler>::setCatalogId()
425{
426 //empty
427}
428
429template <>
430void
431QueryAdapter<MYSQL>::setCatalogId()
432{
433 // use public key digest as the catalog ID
434 ndn::Name keyId;
435 if (m_signingId.empty()) {
436 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
437 } else {
438 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
439 }
440
441 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
442 ndn::Block keyDigest = pKey->computeDigest();
443 m_catalogId.clear();
444 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
445}
446
447template <typename DatabaseHandler>
448void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600449QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
450{
451 //empty
452}
453
454template <>
455void
456QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
457{
458 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
459
460 m_databaseHandler = conn;
461}
462
463template <typename DatabaseHandler>
464QueryAdapter<DatabaseHandler>::~QueryAdapter()
465{
466 for (const auto& itr : m_registeredPrefixList) {
467 if (static_cast<bool>(itr.second))
468 m_face->unsetInterestFilter(itr.second);
469 }
470}
471
472template <typename DatabaseHandler>
473void
474QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
475 const ndn::Interest& interest)
476{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600477 _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
478
Alison Craig2a4d5282015-04-10 12:00:02 -0600479 if (interest.getName().size() != filter.getPrefix().size() + 1) {
480 // @todo: return a nack
481 return;
482 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600483
Alison Craig2a4d5282015-04-10 12:00:02 -0600484 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600485
Chengyu Fan71b712b2015-09-09 22:13:56 -0600486 _LOG_DEBUG("Interest : " << interestPtr->getName());
Chengyu Fan92440162015-07-09 14:43:31 -0600487
Chengyu Fanb25835b2015-04-28 17:09:35 -0600488 // @todo: use thread pool
489 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
490 this,
491 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600492 queryThread.join();
493}
494
495template <typename DatabaseHandler>
496void
497QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
498 const ndn::Interest& interest)
499{
500 // FIXME Results are currently getting served out of the forwarder's
501 // CS so we just ignore any retrieval Interests that hit us for
502 // now. In the future, this should check some form of
503 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600504
Chengyu Fan71b712b2015-09-09 22:13:56 -0600505 _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
Chengyu Fan92440162015-07-09 14:43:31 -0600506
Alison Craig1aced7d2015-04-10 12:00:02 -0600507 auto data = m_cache.find(interest.getName());
508 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600509 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600510 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600511
512 _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
Alison Craig2a4d5282015-04-10 12:00:02 -0600513}
514
515template <typename DatabaseHandler>
516void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600517QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600518 const ndn::Interest& interest)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600519{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600520 _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600521 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
522
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523 _LOG_DEBUG("Interest : " << interestPtr->getName());
524
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600525 // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
526 // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
527
528 auto data = m_cache.find(interest.getName());
529 if (data) {
530 m_face->put(*data);
531 }
532 else {
533 std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
534 this,
535 interestPtr);
536 queryThread.join();
537 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600538
539 _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600540}
541
542template <typename DatabaseHandler>
543void
544QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
545{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600546 _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600547 Json::Value filters;
548 Json::FastWriter fastWriter;
549 getFiltersMenu(filters);
550
551 const std::string filterValue = fastWriter.write(filters);
552
553 if (!filters.empty()) {
554 ndn::Name filterDataName(interest->getName());
555 filterDataName.append("stateVersion");// TODO: should replace with a state version
556
557 const char* payload = filterValue.c_str();
558 size_t payloadLength = filterValue.size();
559 size_t startIndex = 0, seqNo = 0;
560
561 if (filterValue.length() > PAYLOAD_LIMIT) {
562 payloadLength = PAYLOAD_LIMIT;
563 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
564 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
565 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
566 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
567
568 signData(*filterData);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600569
570 _LOG_DEBUG("Populate Filter Data :" << segmentName);
571
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600572 m_mutex.lock();
573 m_cache.insert(*filterData);
574 try {
575 m_face->put(*filterData);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600576 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600577 catch (std::exception& e) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600578 _LOG_ERROR(e.what());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600579 }
580 m_mutex.unlock();
581
582 seqNo++;
583 startIndex = payloadLength * seqNo + 1;
584 }
585 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
586
587 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
588 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
589 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
590 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
591 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
592
593 signData(*filterData);
594 m_mutex.lock();
595 m_cache.insert(*filterData);
596 m_face->put(*filterData);
597 m_mutex.unlock();
598 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600599 _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600600}
601
602template <typename DatabaseHandler>
603void
604QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
605{
606 // empty
607}
608
609// get distinct value of each column
610template <>
611void
612QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
613{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600614 _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600615 Json::Value tmp;
616
617 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
618 std::string columnName = m_filterCategoryNames[i];
619 std::string getFilterSql("SELECT DISTINCT " + columnName +
620 " FROM " + m_databaseTable + ";");
621 std::string errMsg;
622 bool success;
623
624 std::shared_ptr<MYSQL_RES> results
625 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
626 util::QUERY, success, errMsg);
627 if (!success) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600628 _LOG_ERROR(errMsg);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600629 value.clear();
630 return;
631 }
632
633 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
634 {
635 tmp[columnName].append(row[0]);
636 }
637 value.append(tmp);
638 tmp.clear();
639 }
640
Chengyu Fan71b712b2015-09-09 22:13:56 -0600641 _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600642}
643
644template <typename DatabaseHandler>
645void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600646QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600647{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600648 if (m_signingId.empty())
649 m_keyChain->sign(data);
650 else {
651 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
652 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
653 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600654 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600655}
656
657template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600658ndn::Name
659QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
660 const ndn::Name::Component& version)
661{
662 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
663 // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
664
665 ndn::Name queryResultName(m_prefix);
666 queryResultName.append("query-results")
667 .append(m_catalogId)
668 .append(interest->getName().get(-1))
669 .append(version);
670 return queryResultName;
671}
672
673template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600674std::shared_ptr<ndn::Data>
675QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
676 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600677{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600678 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600679
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600680 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
681 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
682 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600683 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
684
Chengyu Fanb25835b2015-04-28 17:09:35 -0600685 signData(*ack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600686
687 _LOG_DEBUG("Make ACK : " << queryResultNameStr);
688
Chengyu Fanb25835b2015-04-28 17:09:35 -0600689 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600690}
691
692template <typename DatabaseHandler>
693void
Chengyu Fan92440162015-07-09 14:43:31 -0600694QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600695{
Chengyu Fan92440162015-07-09 14:43:31 -0600696 uint64_t segmentNo = 0;
697
698 std::shared_ptr<ndn::Data> nack =
699 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
700 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
701 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
702
703 signData(*nack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600704
705 _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
706
Chengyu Fan92440162015-07-09 14:43:31 -0600707 m_mutex.lock();
708 m_cache.insert(*nack);
709 m_mutex.unlock();
710}
711
712
713template <typename DatabaseHandler>
714bool
715QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
716 Json::Value& jsonValue)
717{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600718 _LOG_DEBUG(">> QueryAdapter::json2Sql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600719
Chengyu Fan71b712b2015-09-09 22:13:56 -0600720 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600721
Chengyu Fan92440162015-07-09 14:43:31 -0600722 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600723 return false;
724 }
725
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600726 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600727 bool input = false;
728 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
729 {
730 Json::Value key = iter.key();
731 Json::Value value = (*iter);
732
Chengyu Fan92440162015-07-09 14:43:31 -0600733 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600734 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600735 return false;
736 }
737
738 // cannot convert to string
739 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600740 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600741 return false;
742 }
743
744 if (key.asString().compare("?") == 0) {
745 continue;
746 }
747
Chengyu Fanb25835b2015-04-28 17:09:35 -0600748 if (input) {
749 sqlQuery << " AND";
750 } else {
751 sqlQuery << " WHERE";
752 }
753
Chengyu Fan92440162015-07-09 14:43:31 -0600754 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600755 input = true;
756 }
757
758 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600759 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600760 }
761 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600762 return true;
763}
764
765template <typename DatabaseHandler>
766bool
767QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600768 Json::Value& jsonValue,
769 bool& lastComponent)
Chengyu Fan92440162015-07-09 14:43:31 -0600770{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600771 _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600772
Chengyu Fan71b712b2015-09-09 22:13:56 -0600773 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600774
Chengyu Fan92440162015-07-09 14:43:31 -0600775 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600776 return false;
777 }
778
779 std::string typedString;
780 // get the string in the jsonValue
781 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
782 {
783 Json::Value key = iter.key();
784 Json::Value value = (*iter);
785
786 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600787 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600788 return false;
789 }
790
791 // cannot convert to string
792 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600793 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600794 return false;
795 }
796
797 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600798 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600799 // since the front end triggers the autocompletion when users typed '/',
800 // there must be a '/' at the end, and the first char must be '/'
801 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
802 return false;
803 break;
804 }
805 }
806
807 // 1. get the expected column number by parsing the typedString, so we can get the filed name
808 size_t pos = 0;
809 size_t start = 1; // start from the 1st char which is not '/'
810 size_t count = 0; // also the name to query for
811 std::string token;
812 std::string delimiter = "/";
813 std::map<std::string, std::string> typedComponents;
814 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
815 token = typedString.substr(start, pos - start);
816 if (count >= m_nameFields.size() - 1) {
817 return false;
818 }
819
820 // add column name and value (token) into map
821 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600822 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600823 start = pos + 1;
824 }
825
826 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
827 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600828 if (count == m_nameFields.size() - 1)
829 lastComponent = true; // indicate this query is to query the last component
830
Chengyu Fan92440162015-07-09 14:43:31 -0600831 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600832 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600833 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
834 it != typedComponents.end(); ++it) {
835 if (more)
836 sqlQuery << " AND";
837 else
838 sqlQuery << " WHERE";
839
840 sqlQuery << " " << it->first << "='" << it->second << "'";
841
842 more = true;
843 }
844 sqlQuery << ";";
845 return true;
846}
847
848template <typename DatabaseHandler>
849bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600850QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
851 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600852{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600853 _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600854
Chengyu Fan71b712b2015-09-09 22:13:56 -0600855 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600856
Chengyu Fan46398212015-08-11 11:23:13 -0600857 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600858 return false;
859 }
860
861 std::string typedString;
862 // get the string in the jsonValue
863 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
864 {
865 Json::Value key = iter.key();
866 Json::Value value = (*iter);
867
868 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600869 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan46398212015-08-11 11:23:13 -0600870 return false;
871 }
872
873 // cannot convert to string
874 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600875 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600876 return false;
877 }
878
879 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600880 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600881 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600882 return false;
883 break;
884 }
885 }
886
887 // 1. get the expected column number by parsing the typedString, so we can get the filed name
888 size_t pos = 0;
889 size_t start = 1; // start from the 1st char which is not '/'
890 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600891 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600892 std::string token;
893 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600894 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600895 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
896 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600897 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600898 return false;
899 }
900
901 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600902 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
903
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600904 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600905 start = pos + 1;
906 }
907
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600908 // we may have a component after the last "/"
909 if (start < typedStringLen) {
910 typedComponents.push_back(std::make_pair(m_nameFields[count],
911 typedString.substr(start, typedStringLen - start)));
912 }
913
Chengyu Fan46398212015-08-11 11:23:13 -0600914 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
915 // return true
916 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600917 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600918 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600919 it != typedComponents.end(); ++it) {
920 if (more)
921 sqlQuery << " AND";
922 else
923 sqlQuery << " WHERE";
924
925 sqlQuery << " " << it->first << "='" << it->second << "'";
926
927 more = true;
928 }
929 sqlQuery << ";";
930 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600931}
932
933template <typename DatabaseHandler>
934void
935QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600936{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600937 _LOG_DEBUG(">> QueryAdapter::runJsonQuery");
938
Alison Craig1aced7d2015-04-10 12:00:02 -0600939 // 1) Strip the prefix off the ndn::Interest's ndn::Name
940 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600941
Chengyu Fanb25835b2015-04-28 17:09:35 -0600942 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
943 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
944 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600945
Chengyu Fanb25835b2015-04-28 17:09:35 -0600946 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600947 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600948 return;
949 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600950
951 // the version should be replaced with ChronoSync state digest
952 ndn::name::Component version;
953
954 if(m_socket != nullptr) {
955 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
956 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
957 _LOG_DEBUG("Original digest" << m_chronosyncDigest);
958 _LOG_DEBUG("New digest : " << digestStr);
959 // if the m_chronosyncDigest and the rootdigest are not equal
960 if (digestStr != m_chronosyncDigest) {
961 // (1) update chronosyncDigest
962 // (2) clear all staled ACK data
963 m_mutex.lock();
964 m_chronosyncDigest = digestStr;
965 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
966 m_mutex.unlock();
967 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
968 }
969 version = ndn::name::Component::fromEscapedString(digestStr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600970 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600971 else {
972 version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
973 }
974
975 // try to respond with the inMemoryStorage
976 m_mutex.lock();
977 { // !!! BEGIN CRITICAL SECTION !!!
978 auto data = m_activeQueryToFirstResponse.find(interest->getName());
979 if (data) {
980 _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
981 m_face->put(*data);
982 m_mutex.unlock();
983 return;
984 }
985 } // !!! END CRITICAL SECTION !!!
986 m_mutex.unlock();
Alison Craig2a4d5282015-04-10 12:00:02 -0600987
988 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
989 Json::Value parsedFromString;
990 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600991 if (!reader.parse(jsonQuery, parsedFromString)) {
992 // @todo: send NACK?
Chengyu Fancfb80c72015-10-19 16:50:04 -0600993 _LOG_ERROR("Cannot parse the JsonQuery");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600994 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600995 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600996
Chengyu Fanb25835b2015-04-28 17:09:35 -0600997 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
998
999 m_mutex.lock();
1000 { // !!! BEGIN CRITICAL SECTION !!!
1001 // An unusual race-condition case, which requires things like PIT aggregation to be off.
Chengyu Fancfb80c72015-10-19 16:50:04 -06001002 auto data = m_activeQueryToFirstResponse.find(interest->getName());
1003 if (data) {
1004 m_face->put(*data);
1005 m_mutex.unlock();
Chengyu Fanb25835b2015-04-28 17:09:35 -06001006 return;
1007 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001008
Chengyu Fanb25835b2015-04-28 17:09:35 -06001009 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -06001010 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
1011 // that conatin the old version when ChronoSync is updated
Chengyu Fancfb80c72015-10-19 16:50:04 -06001012 m_activeQueryToFirstResponse.insert(*ack);
1013
Chengyu Fanb25835b2015-04-28 17:09:35 -06001014 m_face->put(*ack);
1015 } // !!! END CRITICAL SECTION !!!
1016 m_mutex.unlock();
1017
1018 // 3) Convert the JSON Query into a MySQL one
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001019 bool autocomplete = false, lastComponent = false;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001020 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001021
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001022 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fanb25835b2015-04-28 17:09:35 -06001023
Chengyu Fan92440162015-07-09 14:43:31 -06001024 Json::Value tmp;
1025 // expect the autocomplete and the component-based query are separate
1026 // if JSON::Value contains ? as key, is autocompletion
1027 if (parsedFromString.get("?", tmp) != tmp) {
1028 autocomplete = true;
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001029 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
Chengyu Fan92440162015-07-09 14:43:31 -06001030 sendNack(segmentPrefix);
1031 return;
1032 }
1033 }
Chengyu Fan46398212015-08-11 11:23:13 -06001034 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -06001035 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -06001036 sendNack(segmentPrefix);
1037 return;
1038 }
1039 }
Chengyu Fan92440162015-07-09 14:43:31 -06001040 else {
1041 if (!json2Sql(sqlQuery, parsedFromString)) {
1042 sendNack(segmentPrefix);
1043 return;
1044 }
1045 }
1046
1047 // 4) Run the Query
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001048 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001049}
1050
1051template <typename DatabaseHandler>
1052void
1053QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1054 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001055 bool autocomplete,
1056 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001057{
1058 // empty
1059}
1060
1061// prepareSegments specilization function
1062template<>
1063void
1064QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1065 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001066 bool autocomplete,
1067 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001068{
Chengyu Fan71b712b2015-09-09 22:13:56 -06001069 _LOG_DEBUG(">> QueryAdapter::prepareSegments");
Chengyu Fancfb80c72015-10-19 16:50:04 -06001070
Chengyu Fan71b712b2015-09-09 22:13:56 -06001071 _LOG_DEBUG(sqlString);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001072
Chengyu Fan46398212015-08-11 11:23:13 -06001073 std::string errMsg;
1074 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001075 // 4) Run the Query
1076 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001077 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1078 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -06001079 _LOG_ERROR(errMsg);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001080
1081 if (!results) {
Chengyu Fancfb80c72015-10-19 16:50:04 -06001082 _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
Chengyu Fan92440162015-07-09 14:43:31 -06001083
Chengyu Fanb25835b2015-04-28 17:09:35 -06001084 // @todo: throw runtime error or log the error message?
1085 return;
1086 }
1087
Chengyu Fan92440162015-07-09 14:43:31 -06001088 uint64_t resultCount = mysql_num_rows(results.get());
1089
Chengyu Fan71b712b2015-09-09 22:13:56 -06001090 _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
Chengyu Fanb25835b2015-04-28 17:09:35 -06001091
1092 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001093 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001094 Json::Value tmp;
1095 Json::Value resultJson;
1096 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001097
1098 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001099 while ((row = mysql_fetch_row(results.get())))
1100 {
Chengyu Fan46398212015-08-11 11:23:13 -06001101 tmp.append(row[0]);
1102 const std::string tmpString = fastWriter.write(tmp);
1103 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001104 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001105 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001106 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001107 m_mutex.lock();
1108 m_cache.insert(*data);
1109 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001110 tmp.clear();
1111 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001112 segmentNo++;
1113 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001114 }
Chengyu Fan46398212015-08-11 11:23:13 -06001115 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001116 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001117 }
Chengyu Fan46398212015-08-11 11:23:13 -06001118
Chengyu Fanb25835b2015-04-28 17:09:35 -06001119 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001120 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001121 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001122 m_mutex.lock();
1123 m_cache.insert(*data);
1124 m_mutex.unlock();
1125}
1126
1127template <typename DatabaseHandler>
1128std::shared_ptr<ndn::Data>
1129QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1130 const Json::Value& value,
1131 uint64_t segmentNo,
1132 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001133 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001134 uint64_t resultCount,
1135 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001136 uint64_t viewEnd,
1137 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001138{
1139 Json::Value entry;
1140 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001141
1142 entry["resultCount"] = Json::UInt64(resultCount);;
1143 entry["viewStart"] = Json::UInt64(viewStart);
1144 entry["viewEnd"] = Json::UInt64(viewEnd);
1145
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001146 if (lastComponent)
1147 entry["lastComponent"] = Json::Value(true);
1148
Chengyu Fan71b712b2015-09-09 22:13:56 -06001149 _LOG_DEBUG("resultCount " << resultCount << ";"
1150 << "viewStart " << viewStart << ";"
1151 << "viewEnd " << viewEnd);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001152
Chengyu Fanb25835b2015-04-28 17:09:35 -06001153 if (isAutocomplete) {
1154 entry["next"] = value;
1155 } else {
1156 entry["results"] = value;
1157 }
1158 const std::string jsonMessage = fastWriter.write(entry);
1159 const char* payload = jsonMessage.c_str();
1160 size_t payloadLength = jsonMessage.size() + 1;
1161 ndn::Name segmentName(segmentPrefix);
1162 segmentName.appendSegment(segmentNo);
1163
1164 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1165 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1166 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1167
1168 if (isFinalBlock) {
1169 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1170 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001171
Chengyu Fan71b712b2015-09-09 22:13:56 -06001172 _LOG_DEBUG(segmentName);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001173
Chengyu Fanb25835b2015-04-28 17:09:35 -06001174 signData(*data);
1175 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001176}
1177
Chengyu Fancfb80c72015-10-19 16:50:04 -06001178
Alison Craig2a4d5282015-04-10 12:00:02 -06001179} // namespace query
1180} // namespace atmos
1181#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP