blob: ab0b126b73ce000b8f15de4ad5493dfff69d63f7 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Chengyu Fancfb80c72015-10-19 16:50:04 -060042#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060043
44#include "mysql/mysql.h"
45
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060047#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060048#include <memory>
49#include <mutex>
50#include <sstream>
51#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060052#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060053#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060054
Chengyu Fan71b712b2015-09-09 22:13:56 -060055#include "util/logger.hpp"
56
57
Alison Craig2a4d5282015-04-10 12:00:02 -060058namespace atmos {
59namespace query {
Chengyu Fan71b712b2015-09-09 22:13:56 -060060#ifdef HAVE_LOG4CXX
61 INIT_LOGGER("QueryAdapter");
62#endif
63
Chengyu Fan92440162015-07-09 14:43:31 -060064// todo: calculate payload limit by get the size of a signed empty Data packet
65static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060066
67/**
68 * QueryAdapter handles the Query usecases for the catalog
69 */
70template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060071class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060072public:
73 /**
74 * Constructor
75 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060076 * @param face: Face that will be used for NDN communications
77 * @param keyChain: KeyChain that will be used for data signing
78 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060079 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060080 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060081 const std::shared_ptr<ndn::KeyChain>& keyChain,
82 const std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084 virtual
85 ~QueryAdapter();
86
87 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060088 * Helper function to specify section handler
89 */
90 void
91 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060092 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060093 const std::vector<std::string>& nameFields,
94 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060095
96protected:
97 /**
98 * Helper function for configuration parsing
99 */
100 void
101 onConfig(const util::ConfigSection& section,
102 bool isDryDun,
103 const std::string& fileName,
104 const ndn::Name& prefix);
105
106 /**
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 * Handles incoming query requests by stripping the filter off the Interest to get the
108 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
109 *
110 * @param filter: InterestFilter that caused this Interest to be routed
111 * @param interest: Interest that needs to be handled
112 */
113 virtual void
114 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
115
116 /**
117 * Handles requests for responses to an existing query
118 *
119 * @param filter: InterestFilter that caused this Interest to be routed
120 * @param interest: Interest that needs to be handled
121 */
122 virtual void
123 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
124
Alison Craig2a4d5282015-04-10 12:00:02 -0600125 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600126 * Handles requests for responses to an filter initialization request
127 *
128 * @param filter: InterestFilter that caused this Interest to be routed
129 * @param interest: Interest that needs to be handled
130 */
131 virtual void
132 onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
133
134 /**
135 * Helper function that generates query results from a Json query carried in the Interest
136 *
137 * @param interest: Interest that needs to be handled
138 */
139 void
140 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
141
142 void
143 getFiltersMenu(Json::Value& value);
144
145 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600146 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600147 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600148 * @param segmentPrefix: Name that identifies the Prefix for the Data
149 * @param value: Json::Value to be sent in the Data
150 * @param segmentNo: uint64_t the segment for this Data
151 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
152 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600153 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600154 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600155 * @param viewStart: the start index of the record in the query results payload
156 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600157 * @param lastComponent: flag to indicate the content contains the last component for
158 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600159 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600160 std::shared_ptr<ndn::Data>
161 makeReplyData(const ndn::Name& segmentPrefix,
162 const Json::Value& value,
163 uint64_t segmentNo,
164 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600165 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600166 uint64_t resultCount,
167 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600168 uint64_t viewEnd,
169 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600170
171 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600172 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600173 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600174 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600175 */
176 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600177 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600178
Alison Craig1aced7d2015-04-10 12:00:02 -0600179 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600180 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600181 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600182 * @param interest: Intersts that needs to be handled
183 * @param version: Version that needs to be in the data name
184 */
185 std::shared_ptr<ndn::Data>
186 makeAckData(std::shared_ptr<const ndn::Interest> interest,
187 const ndn::Name::Component& version);
188
189 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600190 * Helper function that sends NACK
191 *
192 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600193 */
194 void
Chengyu Fan92440162015-07-09 14:43:31 -0600195 sendNack(const ndn::Name& dataPrefix);
196
197 /**
198 * Helper function that generates the sqlQuery string for component-based query
199 * @param sqlQuery: stringstream to save the sqlQuery string
200 * @param jsonValue: Json value that contains the query information
201 */
202 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600203 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600204 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600205
206 /**
207 * Helper function that signs the data
208 */
209 void
210 signData(ndn::Data& data);
211
212 /**
213 * Helper function that publishes query-results data segments
214 */
215 virtual void
216 prepareSegments(const ndn::Name& segmentPrefix,
217 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600218 bool autocomplete,
219 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600220
221 /**
222 * Helper function to set the DatabaseHandler
223 */
224 void
225 setDatabaseHandler(const util::ConnectionDetails& databaseId);
226
227 /**
228 * Helper function that set filters to make the adapter work
229 */
230 void
231 setFilters();
232
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600233 void
234 setCatalogId();
235
Chengyu Fan92440162015-07-09 14:43:31 -0600236 /**
237 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600238 * @param sqlQuery: stringstream to save the sqlQuery string
239 * @param jsonValue: Json value that contains the query information
240 * @param lastComponent: Flag to mark the last component query
Chengyu Fan92440162015-07-09 14:43:31 -0600241 */
242 bool
243 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600244 Json::Value& jsonValue,
245 bool& lastComponent);
Chengyu Fan92440162015-07-09 14:43:31 -0600246
Chengyu Fan46398212015-08-11 11:23:13 -0600247 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600248 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
249 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600250
251 ndn::Name
252 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
253 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600254
Chengyu Fancfb80c72015-10-19 16:50:04 -0600255 std::string
256 getChronoSyncDigest();
257
Chengyu Fanb25835b2015-04-28 17:09:35 -0600258protected:
259 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
260 // Handle to the Catalog's database
261 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600262 const std::shared_ptr<chronosync::Socket>& m_socket;
Alison Craig1aced7d2015-04-10 12:00:02 -0600263
Alison Craig2a4d5282015-04-10 12:00:02 -0600264 // mutex to control critical sections
265 std::mutex m_mutex;
266 // @{ needs m_mutex protection
267 // The Queries we are currently writing to
Chengyu Fancfb80c72015-10-19 16:50:04 -0600268 //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
269 ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600270 ndn::util::InMemoryStorageLru m_cache;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600271 std::string m_chronosyncDigest;
Alison Craig2a4d5282015-04-10 12:00:02 -0600272 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600273 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600274 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600275 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600276};
277
Alison Craig2a4d5282015-04-10 12:00:02 -0600278template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600279QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600280 const std::shared_ptr<ndn::KeyChain>& keyChain,
281 const std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600282 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600283 , m_socket(syncSocket)
284 , m_activeQueryToFirstResponse(100000)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600285 , m_cache(250000)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600286 , m_chronosyncDigest("0")
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600287 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600288{
Alison Craig2a4d5282015-04-10 12:00:02 -0600289}
290
291template <typename DatabaseHandler>
292void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600293QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600294{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600295 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
296 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
297 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
298 this, _1, _2),
299 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
300 this, _1),
301 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
302 this, _1, _2));
303
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600304 ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
305 m_registeredPrefixList[queryResultsPrefix] =
306 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
307 .append("query-results").append(m_catalogId)),
Chengyu Fanb25835b2015-04-28 17:09:35 -0600308 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
309 this, _1, _2),
310 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
311 this, _1),
312 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
313 this, _1, _2));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600314
315 ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
316 m_registeredPrefixList[filtersInitializationPrefix] =
317 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
318 bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
319 this, _1, _2),
320 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
321 this, _1),
322 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
323 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600324}
325
326template <typename DatabaseHandler>
327void
328QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600329 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600330 const std::vector<std::string>& nameFields,
331 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600332{
Chengyu Fan92440162015-07-09 14:43:31 -0600333 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600334 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600335 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
336 _1, _2, _3, prefix));
337}
338
339template <typename DatabaseHandler>
340void
341QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
342 bool isDryRun,
343 const std::string& filename,
344 const ndn::Name& prefix)
345{
346 using namespace util;
347 if (isDryRun) {
348 return;
349 }
350 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
351 for (auto item = section.begin();
352 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600353 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600354 {
355 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600356 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600357 if (signingId.empty()) {
358 throw Error("Empty value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600359 " in \"query\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600360 }
361 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600362 if (item->first == "filterCategoryNames") {
363 std::istringstream ss(item->second.get_value<std::string>());
364 std::string token;
365 while(std::getline(ss, token, ',')) {
366 m_filterCategoryNames.push_back(token);
367 }
368 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600369 if (item->first == "database") {
370 const util::ConfigSection& dataSection = item->second;
371 for (auto subItem = dataSection.begin();
372 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600373 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600374 {
375 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600376 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600377 }
378 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600379 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600380 }
381 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600382 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600383 }
384 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600385 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600386 }
387 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600388
389 if (dbServer.empty()){
390 throw Error("Invalid value for \"dbServer\""
391 " in \"query\" section");
392 }
393 if (dbName.empty()){
394 throw Error("Invalid value for \"dbName\""
395 " in \"query\" section");
396 }
397 if (dbUser.empty()){
398 throw Error("Invalid value for \"dbUser\""
399 " in \"query\" section");
400 }
401 if (dbPasswd.empty()){
402 throw Error("Invalid value for \"dbPasswd\""
403 " in \"query\" section");
404 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600405 }
406 }
407
Chengyu Fancfb80c72015-10-19 16:50:04 -0600408 if (m_filterCategoryNames.empty()) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600409 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
410 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600411
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600412 m_prefix = prefix;
413
414 m_signingId = ndn::Name(signingId);
415 setCatalogId();
416
417 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600418 setDatabaseHandler(mysqlId);
419 setFilters();
420}
421
422template <typename DatabaseHandler>
423void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600424QueryAdapter<DatabaseHandler>::setCatalogId()
425{
426 //empty
427}
428
429template <>
430void
431QueryAdapter<MYSQL>::setCatalogId()
432{
433 // use public key digest as the catalog ID
434 ndn::Name keyId;
435 if (m_signingId.empty()) {
436 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
437 } else {
438 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
439 }
440
441 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
442 ndn::Block keyDigest = pKey->computeDigest();
443 m_catalogId.clear();
444 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
445}
446
447template <typename DatabaseHandler>
448void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600449QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
450{
451 //empty
452}
453
454template <>
455void
456QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
457{
458 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
459
460 m_databaseHandler = conn;
461}
462
463template <typename DatabaseHandler>
464QueryAdapter<DatabaseHandler>::~QueryAdapter()
465{
466 for (const auto& itr : m_registeredPrefixList) {
467 if (static_cast<bool>(itr.second))
468 m_face->unsetInterestFilter(itr.second);
469 }
470}
471
472template <typename DatabaseHandler>
473void
474QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
475 const ndn::Interest& interest)
476{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600477 _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
478
Alison Craig2a4d5282015-04-10 12:00:02 -0600479 if (interest.getName().size() != filter.getPrefix().size() + 1) {
480 // @todo: return a nack
481 return;
482 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600483
Alison Craig2a4d5282015-04-10 12:00:02 -0600484 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600485
Chengyu Fan71b712b2015-09-09 22:13:56 -0600486 _LOG_DEBUG("Interest : " << interestPtr->getName());
Chengyu Fan92440162015-07-09 14:43:31 -0600487
Chengyu Fanb25835b2015-04-28 17:09:35 -0600488 // @todo: use thread pool
489 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
490 this,
491 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600492 queryThread.join();
493}
494
495template <typename DatabaseHandler>
496void
497QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
498 const ndn::Interest& interest)
499{
500 // FIXME Results are currently getting served out of the forwarder's
501 // CS so we just ignore any retrieval Interests that hit us for
502 // now. In the future, this should check some form of
503 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600504
Chengyu Fan71b712b2015-09-09 22:13:56 -0600505 _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
Chengyu Fan92440162015-07-09 14:43:31 -0600506
Alison Craig1aced7d2015-04-10 12:00:02 -0600507 auto data = m_cache.find(interest.getName());
508 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600509 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600510 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600511
512 _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
Alison Craig2a4d5282015-04-10 12:00:02 -0600513}
514
515template <typename DatabaseHandler>
516void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600517QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600518 const ndn::Interest& interest)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600519{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600520 _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600521 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
522
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523 _LOG_DEBUG("Interest : " << interestPtr->getName());
524
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700525 if(m_socket != nullptr) {
526 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
527 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
528 _LOG_DEBUG("Original digest :" << m_chronosyncDigest);
529 _LOG_DEBUG("New digest : " << digestStr);
530 // if the m_chronosyncDigest and the rootdigest are not equal
531 if (digestStr != m_chronosyncDigest) {
532 // (1) update chronosyncDigest
533 // (2) clear all staled ACK data
534 m_mutex.lock();
535 m_chronosyncDigest = digestStr;
536 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
537 m_mutex.unlock();
538 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
539 }
540 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600541
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700542 auto data = m_activeQueryToFirstResponse.find(interest.getName());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600543 if (data) {
544 m_face->put(*data);
545 }
546 else {
547 std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
548 this,
549 interestPtr);
550 queryThread.join();
551 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600552
553 _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600554}
555
556template <typename DatabaseHandler>
557void
558QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
559{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600560 _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600561 Json::Value filters;
562 Json::FastWriter fastWriter;
563 getFiltersMenu(filters);
564
565 const std::string filterValue = fastWriter.write(filters);
566
567 if (!filters.empty()) {
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700568 // use /<prefix>/filters-initialization/<seg> as data name
569 ndn::Name filterDataName(interest->getName().getPrefix(-1));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600570
571 const char* payload = filterValue.c_str();
572 size_t payloadLength = filterValue.size();
573 size_t startIndex = 0, seqNo = 0;
574
575 if (filterValue.length() > PAYLOAD_LIMIT) {
576 payloadLength = PAYLOAD_LIMIT;
577 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
578 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700579 // freshnessPeriod 0 means permanent?
580 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600581 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
582
583 signData(*filterData);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600584
585 _LOG_DEBUG("Populate Filter Data :" << segmentName);
586
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600587 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700588 // save the filter results in the activeQueryToFirstResponse structure
589 // when version changes, the activeQueryToFirstResponse should be cleaned
590 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600591 try {
592 m_face->put(*filterData);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600593 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600594 catch (std::exception& e) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600595 _LOG_ERROR(e.what());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600596 }
597 m_mutex.unlock();
598
599 seqNo++;
600 startIndex = payloadLength * seqNo + 1;
601 }
602 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
603
604 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
605 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700606 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600607 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
608 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
609
610 signData(*filterData);
611 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700612 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600613 m_face->put(*filterData);
614 m_mutex.unlock();
615 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600616 _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600617}
618
619template <typename DatabaseHandler>
620void
621QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
622{
623 // empty
624}
625
626// get distinct value of each column
627template <>
628void
629QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
630{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600631 _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600632 Json::Value tmp;
633
634 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
635 std::string columnName = m_filterCategoryNames[i];
636 std::string getFilterSql("SELECT DISTINCT " + columnName +
637 " FROM " + m_databaseTable + ";");
638 std::string errMsg;
639 bool success;
640
641 std::shared_ptr<MYSQL_RES> results
642 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
643 util::QUERY, success, errMsg);
644 if (!success) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600645 _LOG_ERROR(errMsg);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600646 value.clear();
647 return;
648 }
649
650 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
651 {
652 tmp[columnName].append(row[0]);
653 }
654 value.append(tmp);
655 tmp.clear();
656 }
657
Chengyu Fan71b712b2015-09-09 22:13:56 -0600658 _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600659}
660
661template <typename DatabaseHandler>
662void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600663QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600664{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600665 if (m_signingId.empty())
666 m_keyChain->sign(data);
667 else {
668 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
669 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
670 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600671 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600672}
673
674template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600675ndn::Name
676QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
677 const ndn::Name::Component& version)
678{
679 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
680 // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
681
682 ndn::Name queryResultName(m_prefix);
683 queryResultName.append("query-results")
684 .append(m_catalogId)
685 .append(interest->getName().get(-1))
686 .append(version);
687 return queryResultName;
688}
689
690template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600691std::shared_ptr<ndn::Data>
692QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
693 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600694{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600695 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600696
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600697 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
698 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
699 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600700 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
701
Chengyu Fanb25835b2015-04-28 17:09:35 -0600702 signData(*ack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600703
704 _LOG_DEBUG("Make ACK : " << queryResultNameStr);
705
Chengyu Fanb25835b2015-04-28 17:09:35 -0600706 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600707}
708
709template <typename DatabaseHandler>
710void
Chengyu Fan92440162015-07-09 14:43:31 -0600711QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600712{
Chengyu Fan92440162015-07-09 14:43:31 -0600713 uint64_t segmentNo = 0;
714
715 std::shared_ptr<ndn::Data> nack =
716 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
717 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
718 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
719
720 signData(*nack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600721
722 _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
723
Chengyu Fan92440162015-07-09 14:43:31 -0600724 m_mutex.lock();
725 m_cache.insert(*nack);
726 m_mutex.unlock();
727}
728
729
730template <typename DatabaseHandler>
731bool
732QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
733 Json::Value& jsonValue)
734{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600735 _LOG_DEBUG(">> QueryAdapter::json2Sql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600736
Chengyu Fan71b712b2015-09-09 22:13:56 -0600737 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600738
Chengyu Fan92440162015-07-09 14:43:31 -0600739 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600740 return false;
741 }
742
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600743 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600744 bool input = false;
745 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
746 {
747 Json::Value key = iter.key();
748 Json::Value value = (*iter);
749
Chengyu Fan92440162015-07-09 14:43:31 -0600750 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600751 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600752 return false;
753 }
754
755 // cannot convert to string
756 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600757 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600758 return false;
759 }
760
761 if (key.asString().compare("?") == 0) {
762 continue;
763 }
764
Chengyu Fanb25835b2015-04-28 17:09:35 -0600765 if (input) {
766 sqlQuery << " AND";
767 } else {
768 sqlQuery << " WHERE";
769 }
770
Chengyu Fan92440162015-07-09 14:43:31 -0600771 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600772 input = true;
773 }
774
775 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600776 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600777 }
778 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600779 return true;
780}
781
782template <typename DatabaseHandler>
783bool
784QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600785 Json::Value& jsonValue,
786 bool& lastComponent)
Chengyu Fan92440162015-07-09 14:43:31 -0600787{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600788 _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600789
Chengyu Fan71b712b2015-09-09 22:13:56 -0600790 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600791
Chengyu Fan92440162015-07-09 14:43:31 -0600792 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600793 return false;
794 }
795
796 std::string typedString;
797 // get the string in the jsonValue
798 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
799 {
800 Json::Value key = iter.key();
801 Json::Value value = (*iter);
802
803 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600804 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600805 return false;
806 }
807
808 // cannot convert to string
809 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600810 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600811 return false;
812 }
813
814 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600815 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600816 // since the front end triggers the autocompletion when users typed '/',
817 // there must be a '/' at the end, and the first char must be '/'
818 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
819 return false;
820 break;
821 }
822 }
823
824 // 1. get the expected column number by parsing the typedString, so we can get the filed name
825 size_t pos = 0;
826 size_t start = 1; // start from the 1st char which is not '/'
827 size_t count = 0; // also the name to query for
828 std::string token;
829 std::string delimiter = "/";
830 std::map<std::string, std::string> typedComponents;
831 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
832 token = typedString.substr(start, pos - start);
833 if (count >= m_nameFields.size() - 1) {
834 return false;
835 }
836
837 // add column name and value (token) into map
838 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600839 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600840 start = pos + 1;
841 }
842
843 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
844 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600845 if (count == m_nameFields.size() - 1)
846 lastComponent = true; // indicate this query is to query the last component
847
Chengyu Fan92440162015-07-09 14:43:31 -0600848 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600849 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600850 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
851 it != typedComponents.end(); ++it) {
852 if (more)
853 sqlQuery << " AND";
854 else
855 sqlQuery << " WHERE";
856
857 sqlQuery << " " << it->first << "='" << it->second << "'";
858
859 more = true;
860 }
861 sqlQuery << ";";
862 return true;
863}
864
865template <typename DatabaseHandler>
866bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600867QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
868 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600869{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600870 _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600871
Chengyu Fan71b712b2015-09-09 22:13:56 -0600872 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600873
Chengyu Fan46398212015-08-11 11:23:13 -0600874 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600875 return false;
876 }
877
878 std::string typedString;
879 // get the string in the jsonValue
880 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
881 {
882 Json::Value key = iter.key();
883 Json::Value value = (*iter);
884
885 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600886 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan46398212015-08-11 11:23:13 -0600887 return false;
888 }
889
890 // cannot convert to string
891 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600892 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600893 return false;
894 }
895
896 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600897 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600898 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600899 return false;
900 break;
901 }
902 }
903
904 // 1. get the expected column number by parsing the typedString, so we can get the filed name
905 size_t pos = 0;
906 size_t start = 1; // start from the 1st char which is not '/'
907 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600908 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600909 std::string token;
910 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600911 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600912 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
913 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600914 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600915 return false;
916 }
917
918 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600919 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
920
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600921 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600922 start = pos + 1;
923 }
924
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600925 // we may have a component after the last "/"
926 if (start < typedStringLen) {
927 typedComponents.push_back(std::make_pair(m_nameFields[count],
928 typedString.substr(start, typedStringLen - start)));
929 }
930
Chengyu Fan46398212015-08-11 11:23:13 -0600931 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
932 // return true
933 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600934 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600935 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600936 it != typedComponents.end(); ++it) {
937 if (more)
938 sqlQuery << " AND";
939 else
940 sqlQuery << " WHERE";
941
942 sqlQuery << " " << it->first << "='" << it->second << "'";
943
944 more = true;
945 }
946 sqlQuery << ";";
947 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600948}
949
950template <typename DatabaseHandler>
951void
952QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600953{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600954 _LOG_DEBUG(">> QueryAdapter::runJsonQuery");
955
Alison Craig1aced7d2015-04-10 12:00:02 -0600956 // 1) Strip the prefix off the ndn::Interest's ndn::Name
957 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600958
Chengyu Fanb25835b2015-04-28 17:09:35 -0600959 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
960 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
961 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600962
Chengyu Fanb25835b2015-04-28 17:09:35 -0600963 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600964 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600965 return;
966 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600967
968 // the version should be replaced with ChronoSync state digest
969 ndn::name::Component version;
970
971 if(m_socket != nullptr) {
972 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
973 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700974 _LOG_DEBUG("Original digest " << m_chronosyncDigest);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600975 _LOG_DEBUG("New digest : " << digestStr);
976 // if the m_chronosyncDigest and the rootdigest are not equal
977 if (digestStr != m_chronosyncDigest) {
978 // (1) update chronosyncDigest
979 // (2) clear all staled ACK data
980 m_mutex.lock();
981 m_chronosyncDigest = digestStr;
982 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
983 m_mutex.unlock();
984 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
985 }
986 version = ndn::name::Component::fromEscapedString(digestStr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600987 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600988 else {
989 version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
990 }
991
992 // try to respond with the inMemoryStorage
993 m_mutex.lock();
994 { // !!! BEGIN CRITICAL SECTION !!!
995 auto data = m_activeQueryToFirstResponse.find(interest->getName());
996 if (data) {
997 _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
998 m_face->put(*data);
999 m_mutex.unlock();
1000 return;
1001 }
1002 } // !!! END CRITICAL SECTION !!!
1003 m_mutex.unlock();
Alison Craig2a4d5282015-04-10 12:00:02 -06001004
1005 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
1006 Json::Value parsedFromString;
1007 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001008 if (!reader.parse(jsonQuery, parsedFromString)) {
1009 // @todo: send NACK?
Chengyu Fancfb80c72015-10-19 16:50:04 -06001010 _LOG_ERROR("Cannot parse the JsonQuery");
Chengyu Fanb25835b2015-04-28 17:09:35 -06001011 return;
Alison Craig2a4d5282015-04-10 12:00:02 -06001012 }
Chengyu Fanb25835b2015-04-28 17:09:35 -06001013
Chengyu Fanb25835b2015-04-28 17:09:35 -06001014 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
1015
1016 m_mutex.lock();
1017 { // !!! BEGIN CRITICAL SECTION !!!
1018 // An unusual race-condition case, which requires things like PIT aggregation to be off.
Chengyu Fancfb80c72015-10-19 16:50:04 -06001019 auto data = m_activeQueryToFirstResponse.find(interest->getName());
1020 if (data) {
1021 m_face->put(*data);
1022 m_mutex.unlock();
Chengyu Fanb25835b2015-04-28 17:09:35 -06001023 return;
1024 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001025
Chengyu Fanb25835b2015-04-28 17:09:35 -06001026 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -06001027 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
1028 // that conatin the old version when ChronoSync is updated
Chengyu Fancfb80c72015-10-19 16:50:04 -06001029 m_activeQueryToFirstResponse.insert(*ack);
1030
Chengyu Fanb25835b2015-04-28 17:09:35 -06001031 m_face->put(*ack);
1032 } // !!! END CRITICAL SECTION !!!
1033 m_mutex.unlock();
1034
1035 // 3) Convert the JSON Query into a MySQL one
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001036 bool autocomplete = false, lastComponent = false;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001037 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001038
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001039 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fanb25835b2015-04-28 17:09:35 -06001040
Chengyu Fan92440162015-07-09 14:43:31 -06001041 Json::Value tmp;
1042 // expect the autocomplete and the component-based query are separate
1043 // if JSON::Value contains ? as key, is autocompletion
1044 if (parsedFromString.get("?", tmp) != tmp) {
1045 autocomplete = true;
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001046 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
Chengyu Fan92440162015-07-09 14:43:31 -06001047 sendNack(segmentPrefix);
1048 return;
1049 }
1050 }
Chengyu Fan46398212015-08-11 11:23:13 -06001051 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -06001052 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -06001053 sendNack(segmentPrefix);
1054 return;
1055 }
1056 }
Chengyu Fan92440162015-07-09 14:43:31 -06001057 else {
1058 if (!json2Sql(sqlQuery, parsedFromString)) {
1059 sendNack(segmentPrefix);
1060 return;
1061 }
1062 }
1063
1064 // 4) Run the Query
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001065 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001066}
1067
1068template <typename DatabaseHandler>
1069void
1070QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1071 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001072 bool autocomplete,
1073 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001074{
1075 // empty
1076}
1077
1078// prepareSegments specilization function
1079template<>
1080void
1081QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1082 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001083 bool autocomplete,
1084 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001085{
Chengyu Fan71b712b2015-09-09 22:13:56 -06001086 _LOG_DEBUG(">> QueryAdapter::prepareSegments");
Chengyu Fancfb80c72015-10-19 16:50:04 -06001087
Chengyu Fan71b712b2015-09-09 22:13:56 -06001088 _LOG_DEBUG(sqlString);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001089
Chengyu Fan46398212015-08-11 11:23:13 -06001090 std::string errMsg;
1091 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001092 // 4) Run the Query
1093 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001094 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1095 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -06001096 _LOG_ERROR(errMsg);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001097
1098 if (!results) {
Chengyu Fancfb80c72015-10-19 16:50:04 -06001099 _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
Chengyu Fan92440162015-07-09 14:43:31 -06001100
Chengyu Fanb25835b2015-04-28 17:09:35 -06001101 // @todo: throw runtime error or log the error message?
1102 return;
1103 }
1104
Chengyu Fan92440162015-07-09 14:43:31 -06001105 uint64_t resultCount = mysql_num_rows(results.get());
1106
Chengyu Fan71b712b2015-09-09 22:13:56 -06001107 _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
Chengyu Fanb25835b2015-04-28 17:09:35 -06001108
1109 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001110 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001111 Json::Value tmp;
1112 Json::Value resultJson;
1113 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001114
1115 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001116 while ((row = mysql_fetch_row(results.get())))
1117 {
Chengyu Fan46398212015-08-11 11:23:13 -06001118 tmp.append(row[0]);
1119 const std::string tmpString = fastWriter.write(tmp);
1120 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001121 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001122 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001123 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001124 m_mutex.lock();
1125 m_cache.insert(*data);
1126 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001127 tmp.clear();
1128 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001129 segmentNo++;
1130 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001131 }
Chengyu Fan46398212015-08-11 11:23:13 -06001132 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001133 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001134 }
Chengyu Fan46398212015-08-11 11:23:13 -06001135
Chengyu Fanb25835b2015-04-28 17:09:35 -06001136 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001137 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001138 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001139 m_mutex.lock();
1140 m_cache.insert(*data);
1141 m_mutex.unlock();
1142}
1143
1144template <typename DatabaseHandler>
1145std::shared_ptr<ndn::Data>
1146QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1147 const Json::Value& value,
1148 uint64_t segmentNo,
1149 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001150 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001151 uint64_t resultCount,
1152 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001153 uint64_t viewEnd,
1154 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001155{
1156 Json::Value entry;
1157 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001158
1159 entry["resultCount"] = Json::UInt64(resultCount);;
1160 entry["viewStart"] = Json::UInt64(viewStart);
1161 entry["viewEnd"] = Json::UInt64(viewEnd);
1162
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001163 if (lastComponent)
1164 entry["lastComponent"] = Json::Value(true);
1165
Chengyu Fan71b712b2015-09-09 22:13:56 -06001166 _LOG_DEBUG("resultCount " << resultCount << ";"
1167 << "viewStart " << viewStart << ";"
1168 << "viewEnd " << viewEnd);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001169
Chengyu Fanb25835b2015-04-28 17:09:35 -06001170 if (isAutocomplete) {
1171 entry["next"] = value;
1172 } else {
1173 entry["results"] = value;
1174 }
1175 const std::string jsonMessage = fastWriter.write(entry);
1176 const char* payload = jsonMessage.c_str();
1177 size_t payloadLength = jsonMessage.size() + 1;
1178 ndn::Name segmentName(segmentPrefix);
1179 segmentName.appendSegment(segmentNo);
1180
1181 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1182 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1183 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1184
1185 if (isFinalBlock) {
1186 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1187 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001188
Chengyu Fan71b712b2015-09-09 22:13:56 -06001189 _LOG_DEBUG(segmentName);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001190
Chengyu Fanb25835b2015-04-28 17:09:35 -06001191 signData(*data);
1192 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001193}
1194
Chengyu Fancfb80c72015-10-19 16:50:04 -06001195
Alison Craig2a4d5282015-04-10 12:00:02 -06001196} // namespace query
1197} // namespace atmos
1198#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP