blob: 5e61aff18033e18be6aabe8a4776bb3e2baec9b6 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Chengyu Fancfb80c72015-10-19 16:50:04 -060042#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060043
44#include "mysql/mysql.h"
45
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060047#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060048#include <memory>
49#include <mutex>
50#include <sstream>
51#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060052#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060053#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060054
Chengyu Fan71b712b2015-09-09 22:13:56 -060055#include "util/logger.hpp"
56
57
Alison Craig2a4d5282015-04-10 12:00:02 -060058namespace atmos {
59namespace query {
Chengyu Fan71b712b2015-09-09 22:13:56 -060060#ifdef HAVE_LOG4CXX
61 INIT_LOGGER("QueryAdapter");
62#endif
63
Chengyu Fan92440162015-07-09 14:43:31 -060064// todo: calculate payload limit by get the size of a signed empty Data packet
65static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060066
67/**
68 * QueryAdapter handles the Query usecases for the catalog
69 */
70template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060071class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060072public:
73 /**
74 * Constructor
75 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060076 * @param face: Face that will be used for NDN communications
77 * @param keyChain: KeyChain that will be used for data signing
78 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060079 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060080 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060081 const std::shared_ptr<ndn::KeyChain>& keyChain,
82 const std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084 virtual
85 ~QueryAdapter();
86
87 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060088 * Helper function to specify section handler
89 */
90 void
91 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060092 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060093 const std::vector<std::string>& nameFields,
94 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060095
96protected:
97 /**
98 * Helper function for configuration parsing
99 */
100 void
101 onConfig(const util::ConfigSection& section,
102 bool isDryDun,
103 const std::string& fileName,
104 const ndn::Name& prefix);
105
106 /**
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 * Handles incoming query requests by stripping the filter off the Interest to get the
108 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
109 *
110 * @param filter: InterestFilter that caused this Interest to be routed
111 * @param interest: Interest that needs to be handled
112 */
113 virtual void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700114 onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600115
Alison Craig2a4d5282015-04-10 12:00:02 -0600116 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600117 * Handles requests for responses to an filter initialization request
118 *
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600119 * @param interest: Interest that needs to be handled
120 */
121 virtual void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700122 onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600123
124 /**
125 * Helper function that generates query results from a Json query carried in the Interest
126 *
127 * @param interest: Interest that needs to be handled
128 */
129 void
130 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
131
132 void
133 getFiltersMenu(Json::Value& value);
134
135 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600136 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600137 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600138 * @param segmentPrefix: Name that identifies the Prefix for the Data
139 * @param value: Json::Value to be sent in the Data
140 * @param segmentNo: uint64_t the segment for this Data
141 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
142 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600143 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600144 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 * @param viewStart: the start index of the record in the query results payload
146 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600147 * @param lastComponent: flag to indicate the content contains the last component for
148 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600149 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600150 std::shared_ptr<ndn::Data>
151 makeReplyData(const ndn::Name& segmentPrefix,
152 const Json::Value& value,
153 uint64_t segmentNo,
154 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600155 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600156 uint64_t resultCount,
157 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600158 uint64_t viewEnd,
159 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600160
161 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600162 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600163 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600164 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600165 */
166 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600167 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600168
Alison Craig1aced7d2015-04-10 12:00:02 -0600169 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600170 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600171 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600172 * @param interest: Intersts that needs to be handled
173 * @param version: Version that needs to be in the data name
174 */
175 std::shared_ptr<ndn::Data>
176 makeAckData(std::shared_ptr<const ndn::Interest> interest,
177 const ndn::Name::Component& version);
178
179 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600180 * Helper function that sends NACK
181 *
182 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600183 */
184 void
Chengyu Fan92440162015-07-09 14:43:31 -0600185 sendNack(const ndn::Name& dataPrefix);
186
187 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600188 * Helper function that signs the data
189 */
190 void
191 signData(ndn::Data& data);
192
193 /**
194 * Helper function that publishes query-results data segments
195 */
196 virtual void
Chengyu Fan31737f12016-01-12 21:08:50 -0700197 prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
198 const std::string& sqlString,
199 bool lastComponent,
200 const std::string& nameField);
201
202 virtual void
203 prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
204 const ndn::Name& segmentPrefix);
205
206 void
207 generateSegments(ResultSet_T& res,
208 const ndn::Name& segmentPrefix,
209 int resultCount,
210 bool autocomplete,
211 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600212
213 /**
214 * Helper function to set the DatabaseHandler
215 */
216 void
217 setDatabaseHandler(const util::ConnectionDetails& databaseId);
218
Chengyu Fan31737f12016-01-12 21:08:50 -0700219 void
220 closeDatabaseHandler();
221
Chengyu Fanb25835b2015-04-28 17:09:35 -0600222 /**
223 * Helper function that set filters to make the adapter work
224 */
225 void
226 setFilters();
227
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600228 void
229 setCatalogId();
230
Chengyu Fan92440162015-07-09 14:43:31 -0600231 /**
232 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600233 * @param sqlQuery: stringstream to save the sqlQuery string
234 * @param jsonValue: Json value that contains the query information
235 * @param lastComponent: Flag to mark the last component query
Chengyu Fan31737f12016-01-12 21:08:50 -0700236 * @param nameField: stringstream to save the nameField string
Chengyu Fan92440162015-07-09 14:43:31 -0600237 */
238 bool
239 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600240 Json::Value& jsonValue,
Chengyu Fan31737f12016-01-12 21:08:50 -0700241 bool& lastComponent,
242 std::stringstream& nameField);
Chengyu Fan92440162015-07-09 14:43:31 -0600243
Chengyu Fan46398212015-08-11 11:23:13 -0600244 bool
Chengyu Fan31737f12016-01-12 21:08:50 -0700245 doPrefixBasedSearch(Json::Value& jsonValue,
246 std::vector<std::pair<std::string, std::string>>& typedComponents);
247
248 bool
249 doFilterBasedSearch(Json::Value& jsonValue,
250 std::vector<std::pair<std::string, std::string>>& typedComponents);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600251
252 ndn::Name
253 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
254 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600255
Chengyu Fancfb80c72015-10-19 16:50:04 -0600256 std::string
257 getChronoSyncDigest();
258
Chengyu Fanb25835b2015-04-28 17:09:35 -0600259protected:
260 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
261 // Handle to the Catalog's database
Chengyu Fan31737f12016-01-12 21:08:50 -0700262 std::shared_ptr<DatabaseHandler> m_dbConnPool;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600263 const std::shared_ptr<chronosync::Socket>& m_socket;
Alison Craig1aced7d2015-04-10 12:00:02 -0600264
Alison Craig2a4d5282015-04-10 12:00:02 -0600265 // mutex to control critical sections
266 std::mutex m_mutex;
267 // @{ needs m_mutex protection
268 // The Queries we are currently writing to
Chengyu Fancfb80c72015-10-19 16:50:04 -0600269 ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600270 ndn::util::InMemoryStorageLru m_cache;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600271 std::string m_chronosyncDigest;
Alison Craig2a4d5282015-04-10 12:00:02 -0600272 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600273 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600274 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600275 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600276};
277
Alison Craig2a4d5282015-04-10 12:00:02 -0600278template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600279QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600280 const std::shared_ptr<ndn::KeyChain>& keyChain,
281 const std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600282 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600283 , m_socket(syncSocket)
284 , m_activeQueryToFirstResponse(100000)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600285 , m_cache(250000)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600286 , m_chronosyncDigest("0")
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600287 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600288{
Alison Craig2a4d5282015-04-10 12:00:02 -0600289}
290
291template <typename DatabaseHandler>
292void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600293QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600294{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700295 m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
296 bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600297 this, _1, _2),
298 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
299 this, _1),
300 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
301 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600302}
303
304template <typename DatabaseHandler>
305void
306QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600307 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600308 const std::vector<std::string>& nameFields,
309 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600310{
Chengyu Fan92440162015-07-09 14:43:31 -0600311 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600312 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600313 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
314 _1, _2, _3, prefix));
315}
316
317template <typename DatabaseHandler>
318void
319QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
320 bool isDryRun,
321 const std::string& filename,
322 const ndn::Name& prefix)
323{
324 using namespace util;
325 if (isDryRun) {
326 return;
327 }
328 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
329 for (auto item = section.begin();
330 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600331 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600332 {
333 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600334 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600335 if (signingId.empty()) {
336 throw Error("Empty value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600337 " in \"query\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600338 }
339 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600340 if (item->first == "filterCategoryNames") {
341 std::istringstream ss(item->second.get_value<std::string>());
342 std::string token;
343 while(std::getline(ss, token, ',')) {
344 m_filterCategoryNames.push_back(token);
345 }
346 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600347 if (item->first == "database") {
348 const util::ConfigSection& dataSection = item->second;
349 for (auto subItem = dataSection.begin();
350 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600351 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600352 {
353 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600354 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600355 }
356 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600357 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600358 }
359 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600360 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600361 }
362 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600363 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600364 }
365 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600366
367 if (dbServer.empty()){
368 throw Error("Invalid value for \"dbServer\""
369 " in \"query\" section");
370 }
371 if (dbName.empty()){
372 throw Error("Invalid value for \"dbName\""
373 " in \"query\" section");
374 }
375 if (dbUser.empty()){
376 throw Error("Invalid value for \"dbUser\""
377 " in \"query\" section");
378 }
379 if (dbPasswd.empty()){
380 throw Error("Invalid value for \"dbPasswd\""
381 " in \"query\" section");
382 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600383 }
384 }
385
Chengyu Fancfb80c72015-10-19 16:50:04 -0600386 if (m_filterCategoryNames.empty()) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600387 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
388 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600389
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600390 m_prefix = prefix;
391
392 m_signingId = ndn::Name(signingId);
393 setCatalogId();
394
395 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600396 setDatabaseHandler(mysqlId);
397 setFilters();
398}
399
400template <typename DatabaseHandler>
401void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600402QueryAdapter<DatabaseHandler>::setCatalogId()
403{
404 //empty
405}
406
407template <>
408void
Chengyu Fan31737f12016-01-12 21:08:50 -0700409QueryAdapter<ConnectionPool_T>::setCatalogId()
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600410{
411 // use public key digest as the catalog ID
412 ndn::Name keyId;
413 if (m_signingId.empty()) {
414 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
415 } else {
416 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
417 }
418
419 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
420 ndn::Block keyDigest = pKey->computeDigest();
421 m_catalogId.clear();
422 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
423}
424
425template <typename DatabaseHandler>
426void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
428{
429 //empty
430}
431
432template <>
433void
Chengyu Fan31737f12016-01-12 21:08:50 -0700434QueryAdapter<ConnectionPool_T>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600435{
Chengyu Fan31737f12016-01-12 21:08:50 -0700436 m_dbConnPool = zdbConnectionSetup(databaseId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600437}
438
439template <typename DatabaseHandler>
Chengyu Fan31737f12016-01-12 21:08:50 -0700440void
441QueryAdapter<DatabaseHandler>::closeDatabaseHandler()
442{
443}
444
445template <>
446void
447QueryAdapter<ConnectionPool_T>::closeDatabaseHandler()
448{
449 ConnectionPool_stop(*m_dbConnPool);
450}
451
452
453template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600454QueryAdapter<DatabaseHandler>::~QueryAdapter()
455{
456 for (const auto& itr : m_registeredPrefixList) {
457 if (static_cast<bool>(itr.second))
458 m_face->unsetInterestFilter(itr.second);
459 }
Chengyu Fan31737f12016-01-12 21:08:50 -0700460
461 closeDatabaseHandler();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600462}
463
464template <typename DatabaseHandler>
465void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700466QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
467 const ndn::Interest& interest)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600468{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700469 _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");
Chengyu Fan71b712b2015-09-09 22:13:56 -0600470
Chengyu Fan7b978f82015-12-09 17:03:23 -0700471 // Interest must carry component "initialization" or "query"
472 if (interest.getName().size() < filter.getPrefix().size()) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700473 // must NACK incorrect interest
474 sendNack(interest.getName());
Alison Craig2a4d5282015-04-10 12:00:02 -0600475 return;
476 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600477
Chengyu Fan7b978f82015-12-09 17:03:23 -0700478 _LOG_DEBUG("Interest : " << interest.getName());
Alison Craig2a4d5282015-04-10 12:00:02 -0600479 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600480
Chengyu Fan7b978f82015-12-09 17:03:23 -0700481 if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
482 std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
483 this,
484 interestPtr);
Chengyu Fan31737f12016-01-12 21:08:50 -0700485 queryThread.detach();
Chengyu Fan7b978f82015-12-09 17:03:23 -0700486 }
487 else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
Chengyu Fan92440162015-07-09 14:43:31 -0600488
Chengyu Fan7b978f82015-12-09 17:03:23 -0700489 auto data = m_cache.find(interest);
490 if (data) {
491 m_face->put(*data);
492 return;
493 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600494
Chengyu Fan7b978f82015-12-09 17:03:23 -0700495 // catalog must strip sequence number in an Interest for further process
496 if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
497 // Interest carries sequence number, only grip the main part
498 // e.g., /hep/query/<query-params>/<version>/#seq
499 ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));
Chengyu Fan92440162015-07-09 14:43:31 -0600500
Chengyu Fan7b978f82015-12-09 17:03:23 -0700501 auto data = m_cache.find(queryInterest);
502 if (data) {
503 // catalog has generated some data, but still working on it
504 return;
505 }
Chengyu Fan69279bf2016-02-18 16:05:00 -0700506 interestPtr = std::make_shared<ndn::Interest>(queryInterest);
Chengyu Fan7b978f82015-12-09 17:03:23 -0700507 }
Chengyu Fan92440162015-07-09 14:43:31 -0600508
Chengyu Fan7b978f82015-12-09 17:03:23 -0700509 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
510 this,
511 interestPtr);
Chengyu Fan31737f12016-01-12 21:08:50 -0700512 queryThread.detach();
Alison Craig1aced7d2015-04-10 12:00:02 -0600513 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600514
Chengyu Fan7b978f82015-12-09 17:03:23 -0700515 // ignore other Interests
Alison Craig2a4d5282015-04-10 12:00:02 -0600516}
517
518template <typename DatabaseHandler>
519void
Chengyu Fan7b978f82015-12-09 17:03:23 -0700520QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600521{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600522 _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700524 if(m_socket != nullptr) {
525 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
526 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
527 _LOG_DEBUG("Original digest :" << m_chronosyncDigest);
528 _LOG_DEBUG("New digest : " << digestStr);
529 // if the m_chronosyncDigest and the rootdigest are not equal
530 if (digestStr != m_chronosyncDigest) {
531 // (1) update chronosyncDigest
532 // (2) clear all staled ACK data
533 m_mutex.lock();
534 m_chronosyncDigest = digestStr;
535 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
536 m_mutex.unlock();
537 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
538 }
539 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600540
Chengyu Fan7b978f82015-12-09 17:03:23 -0700541 auto data = m_activeQueryToFirstResponse.find(*interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600542 if (data) {
543 m_face->put(*data);
544 }
545 else {
Chengyu Fan7b978f82015-12-09 17:03:23 -0700546 populateFiltersMenu(interest);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600547 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600548
549 _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600550}
551
552template <typename DatabaseHandler>
553void
554QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
555{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600556 _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600557 Json::Value filters;
558 Json::FastWriter fastWriter;
559 getFiltersMenu(filters);
560
561 const std::string filterValue = fastWriter.write(filters);
562
563 if (!filters.empty()) {
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700564 // use /<prefix>/filters-initialization/<seg> as data name
565 ndn::Name filterDataName(interest->getName().getPrefix(-1));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600566
567 const char* payload = filterValue.c_str();
568 size_t payloadLength = filterValue.size();
569 size_t startIndex = 0, seqNo = 0;
570
571 if (filterValue.length() > PAYLOAD_LIMIT) {
572 payloadLength = PAYLOAD_LIMIT;
573 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
574 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700575 // freshnessPeriod 0 means permanent?
576 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600577 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
578
579 signData(*filterData);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600580
581 _LOG_DEBUG("Populate Filter Data :" << segmentName);
582
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600583 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700584 // save the filter results in the activeQueryToFirstResponse structure
585 // when version changes, the activeQueryToFirstResponse should be cleaned
586 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600587 try {
588 m_face->put(*filterData);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600589 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600590 catch (std::exception& e) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600591 _LOG_ERROR(e.what());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600592 }
593 m_mutex.unlock();
594
595 seqNo++;
596 startIndex = payloadLength * seqNo + 1;
597 }
598 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
599
600 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
601 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700602 filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600603 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
604 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
605
606 signData(*filterData);
607 m_mutex.lock();
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700608 m_activeQueryToFirstResponse.insert(*filterData);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600609 m_face->put(*filterData);
610 m_mutex.unlock();
611 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600612 _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600613}
614
615template <typename DatabaseHandler>
616void
617QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
618{
619 // empty
620}
621
622// get distinct value of each column
623template <>
624void
Chengyu Fan31737f12016-01-12 21:08:50 -0700625QueryAdapter<ConnectionPool_T>::getFiltersMenu(Json::Value& value)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600626{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600627 _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600628 Json::Value tmp;
629
Chengyu Fan31737f12016-01-12 21:08:50 -0700630 Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
631 if (!conn) {
632 _LOG_DEBUG("No available database connections");
633 return;
634 }
635
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600636 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
637 std::string columnName = m_filterCategoryNames[i];
638 std::string getFilterSql("SELECT DISTINCT " + columnName +
639 " FROM " + m_databaseTable + ";");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600640
Chengyu Fan31737f12016-01-12 21:08:50 -0700641 ResultSet_T res4ColumnName;
642 TRY {
643 res4ColumnName = Connection_executeQuery(conn, reinterpret_cast<const char*>(getFilterSql.c_str()), getFilterSql.size());
644 }
645 CATCH(SQLException) {
646 _LOG_ERROR(Connection_getLastError(conn));
647 }
648 END_TRY;
649
650 while (ResultSet_next(res4ColumnName)) {
651 tmp[columnName].append(ResultSet_getString(res4ColumnName, 1));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600652 }
653
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600654 value.append(tmp);
655 tmp.clear();
656 }
657
Chengyu Fan71b712b2015-09-09 22:13:56 -0600658 _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600659}
660
661template <typename DatabaseHandler>
662void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600663QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600664{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600665 if (m_signingId.empty())
666 m_keyChain->sign(data);
667 else {
668 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
669 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
670 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600671 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600672}
673
674template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600675ndn::Name
676QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
677 const ndn::Name::Component& version)
678{
Chengyu Fan7b978f82015-12-09 17:03:23 -0700679 // use generic name, instead of specific one
680 ndn::Name queryResultName = interest->getName();
681 queryResultName.append(version);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600682 return queryResultName;
683}
684
685template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600686std::shared_ptr<ndn::Data>
687QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
688 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600689{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600690 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600691
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600692 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
693 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
694 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600695 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
696
Chengyu Fanb25835b2015-04-28 17:09:35 -0600697 signData(*ack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600698
699 _LOG_DEBUG("Make ACK : " << queryResultNameStr);
700
Chengyu Fanb25835b2015-04-28 17:09:35 -0600701 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600702}
703
704template <typename DatabaseHandler>
705void
Chengyu Fan92440162015-07-09 14:43:31 -0600706QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600707{
Chengyu Fan92440162015-07-09 14:43:31 -0600708 uint64_t segmentNo = 0;
709
710 std::shared_ptr<ndn::Data> nack =
711 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
712 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
713 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
714
715 signData(*nack);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600716
717 _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
718
Chengyu Fan92440162015-07-09 14:43:31 -0600719 m_mutex.lock();
720 m_cache.insert(*nack);
Chengyu Fan31737f12016-01-12 21:08:50 -0700721 m_face->put(*nack);
Chengyu Fan92440162015-07-09 14:43:31 -0600722 m_mutex.unlock();
723}
724
Chengyu Fan92440162015-07-09 14:43:31 -0600725template <typename DatabaseHandler>
726bool
727QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600728 Json::Value& jsonValue,
Chengyu Fan31737f12016-01-12 21:08:50 -0700729 bool& lastComponent,
730 std::stringstream& fieldName)
Chengyu Fan92440162015-07-09 14:43:31 -0600731{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600732 _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600733
Chengyu Fan71b712b2015-09-09 22:13:56 -0600734 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600735
Chengyu Fan92440162015-07-09 14:43:31 -0600736 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600737 return false;
738 }
739
740 std::string typedString;
741 // get the string in the jsonValue
742 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
743 {
744 Json::Value key = iter.key();
745 Json::Value value = (*iter);
746
747 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600748 _LOG_ERROR("Null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600749 return false;
750 }
751
752 // cannot convert to string
753 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600754 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600755 return false;
756 }
757
758 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600759 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600760 // since the front end triggers the autocompletion when users typed '/',
761 // there must be a '/' at the end, and the first char must be '/'
762 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
763 return false;
764 break;
765 }
766 }
767
768 // 1. get the expected column number by parsing the typedString, so we can get the filed name
769 size_t pos = 0;
770 size_t start = 1; // start from the 1st char which is not '/'
771 size_t count = 0; // also the name to query for
772 std::string token;
773 std::string delimiter = "/";
774 std::map<std::string, std::string> typedComponents;
775 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
776 token = typedString.substr(start, pos - start);
777 if (count >= m_nameFields.size() - 1) {
778 return false;
779 }
780
781 // add column name and value (token) into map
782 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600783 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600784 start = pos + 1;
785 }
786
787 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
788 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600789 if (count == m_nameFields.size() - 1)
790 lastComponent = true; // indicate this query is to query the last component
791
Chengyu Fan92440162015-07-09 14:43:31 -0600792 bool more = false;
Chengyu Fan31737f12016-01-12 21:08:50 -0700793
794 fieldName << m_nameFields[count];
Chengyu Fan46398212015-08-11 11:23:13 -0600795 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
796 it != typedComponents.end(); ++it) {
797 if (more)
798 sqlQuery << " AND";
799 else
800 sqlQuery << " WHERE";
801
802 sqlQuery << " " << it->first << "='" << it->second << "'";
803
804 more = true;
805 }
806 sqlQuery << ";";
807 return true;
808}
809
Chengyu Fan31737f12016-01-12 21:08:50 -0700810template <typename databasehandler>
Chengyu Fan46398212015-08-11 11:23:13 -0600811bool
Chengyu Fan31737f12016-01-12 21:08:50 -0700812QueryAdapter<databasehandler>::doPrefixBasedSearch(Json::Value& jsonValue,
813 std::vector<std::pair<std::string, std::string>>& typedComponents)
Chengyu Fan46398212015-08-11 11:23:13 -0600814{
Chengyu Fan31737f12016-01-12 21:08:50 -0700815 _LOG_DEBUG(">> QueryAdapter::doPrefixBasedSearch");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600816
Chengyu Fan46398212015-08-11 11:23:13 -0600817 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600818 return false;
819 }
820
821 std::string typedString;
822 // get the string in the jsonValue
823 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
824 {
825 Json::Value key = iter.key();
826 Json::Value value = (*iter);
827
828 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700829 _LOG_ERROR("null key or value in jsonValue");
Chengyu Fan46398212015-08-11 11:23:13 -0600830 return false;
831 }
832
833 // cannot convert to string
834 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700835 _LOG_ERROR("malformed jsonquery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600836 return false;
837 }
838
839 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600840 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600841 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600842 return false;
843 break;
844 }
845 }
846
847 // 1. get the expected column number by parsing the typedString, so we can get the filed name
848 size_t pos = 0;
849 size_t start = 1; // start from the 1st char which is not '/'
850 size_t count = 0; // also the name to query for
Chengyu Fan31737f12016-01-12 21:08:50 -0700851 size_t typedStringlen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600852 std::string token;
853 std::string delimiter = "/";
Chengyu Fan31737f12016-01-12 21:08:50 -0700854
Chengyu Fan46398212015-08-11 11:23:13 -0600855 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
856 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600857 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600858 return false;
859 }
860
861 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600862 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
863
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600864 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600865 start = pos + 1;
866 }
867
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600868 // we may have a component after the last "/"
Chengyu Fan31737f12016-01-12 21:08:50 -0700869 if (start < typedStringlen) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600870 typedComponents.push_back(std::make_pair(m_nameFields[count],
Chengyu Fan31737f12016-01-12 21:08:50 -0700871 typedString.substr(start, typedStringlen - start)));
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600872 }
873
Chengyu Fan92440162015-07-09 14:43:31 -0600874 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600875}
876
Chengyu Fan31737f12016-01-12 21:08:50 -0700877
878template <typename databasehandler>
879bool
880QueryAdapter<databasehandler>::doFilterBasedSearch(Json::Value& jsonValue,
881 std::vector<std::pair<std::string, std::string>>& typedComponents)
882{
883 _LOG_DEBUG(">> QueryAdapter::doFilterBasedSearch");
884
885 if (jsonValue.type() != Json::objectValue) {
886 return false;
887 }
888
889 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
890 {
891 Json::Value key = iter.key();
892 Json::Value value = (*iter);
893
894 if (key == Json::nullValue || value == Json::nullValue) {
895 _LOG_ERROR("null key or value in jsonValue");
896 return false;
897 }
898
899 // cannot convert to string
900 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
901 _LOG_ERROR("malformed jsonQuery string");
902 return false;
903 }
904
905 if (key.asString().compare("?") == 0 || key.asString().compare("??") == 0) {
906 continue;
907 }
908
909 _LOG_DEBUG(key.asString() << " " << value.asString());
910 typedComponents.push_back(std::make_pair(key.asString(), value.asString()));
911 }
912
913 return true;
914}
915
916
Chengyu Fanb25835b2015-04-28 17:09:35 -0600917template <typename DatabaseHandler>
918void
919QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600920{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600921 _LOG_DEBUG(">> QueryAdapter::runJsonQuery");
922
Alison Craig1aced7d2015-04-10 12:00:02 -0600923 // 1) Strip the prefix off the ndn::Interest's ndn::Name
924 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600925
Chengyu Fanb25835b2015-04-28 17:09:35 -0600926 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
927 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
928 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600929
Chengyu Fanb25835b2015-04-28 17:09:35 -0600930 if (jsonQuery.length() <= 0) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700931 // no JSON query, send Nack
932 sendNack(interest->getName());
Chengyu Fanb25835b2015-04-28 17:09:35 -0600933 return;
934 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600935
936 // the version should be replaced with ChronoSync state digest
937 ndn::name::Component version;
938
939 if(m_socket != nullptr) {
940 const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
941 std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
Chengyu Fan36dae3a2015-11-02 22:39:24 -0700942 _LOG_DEBUG("Original digest " << m_chronosyncDigest);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600943 _LOG_DEBUG("New digest : " << digestStr);
944 // if the m_chronosyncDigest and the rootdigest are not equal
945 if (digestStr != m_chronosyncDigest) {
946 // (1) update chronosyncDigest
947 // (2) clear all staled ACK data
948 m_mutex.lock();
949 m_chronosyncDigest = digestStr;
950 m_activeQueryToFirstResponse.erase(ndn::Name("/"));
951 m_mutex.unlock();
952 _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
953 }
954 version = ndn::name::Component::fromEscapedString(digestStr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600955 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600956 else {
957 version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
958 }
959
Alison Craig2a4d5282015-04-10 12:00:02 -0600960 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
961 Json::Value parsedFromString;
962 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600963 if (!reader.parse(jsonQuery, parsedFromString)) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700964 // json object is broken
965 sendNack(interest->getName());
Chengyu Fancfb80c72015-10-19 16:50:04 -0600966 _LOG_ERROR("Cannot parse the JsonQuery");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600967 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600968 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600969
Chengyu Fanb25835b2015-04-28 17:09:35 -0600970 // 3) Convert the JSON Query into a MySQL one
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600971 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fan7b978f82015-12-09 17:03:23 -0700972 _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600973
Chengyu Fan92440162015-07-09 14:43:31 -0600974 Json::Value tmp;
Chengyu Fan31737f12016-01-12 21:08:50 -0700975 std::vector<std::pair<std::string, std::string>> typedComponents;
976
Chengyu Fan92440162015-07-09 14:43:31 -0600977 // expect the autocomplete and the component-based query are separate
Chengyu Fan31737f12016-01-12 21:08:50 -0700978 // if Json::Value contains ? as key, is autocompletion
Chengyu Fan92440162015-07-09 14:43:31 -0600979 if (parsedFromString.get("?", tmp) != tmp) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700980 bool lastComponent = false;
981 std::stringstream sqlQuery, fieldName;
982
983 // must generate the sql string for autocomple, the selected column is changing
984 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent, fieldName)) {
Chengyu Fan92440162015-07-09 14:43:31 -0600985 sendNack(segmentPrefix);
986 return;
987 }
Chengyu Fan31737f12016-01-12 21:08:50 -0700988 prepareSegmentsBySqlString(segmentPrefix, sqlQuery.str(), lastComponent, fieldName.str());
Chengyu Fan92440162015-07-09 14:43:31 -0600989 }
Chengyu Fan46398212015-08-11 11:23:13 -0600990 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan31737f12016-01-12 21:08:50 -0700991 if (!doPrefixBasedSearch(parsedFromString, typedComponents)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600992 sendNack(segmentPrefix);
993 return;
994 }
Chengyu Fan31737f12016-01-12 21:08:50 -0700995 prepareSegmentsByParams(typedComponents, segmentPrefix);
Chengyu Fan46398212015-08-11 11:23:13 -0600996 }
Chengyu Fan92440162015-07-09 14:43:31 -0600997 else {
Chengyu Fan31737f12016-01-12 21:08:50 -0700998 if (!doFilterBasedSearch(parsedFromString, typedComponents)) {
Chengyu Fan92440162015-07-09 14:43:31 -0600999 sendNack(segmentPrefix);
1000 return;
1001 }
Chengyu Fan31737f12016-01-12 21:08:50 -07001002 prepareSegmentsByParams(typedComponents, segmentPrefix);
1003 }
1004
1005}
1006
1007template <typename databasehandler>
1008void
1009QueryAdapter<databasehandler>::
1010prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
1011 const ndn::Name& segmentprefix)
1012{
1013}
1014
1015template <>
1016void
1017QueryAdapter<ConnectionPool_T>::
1018prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
1019 const ndn::Name& segmentPrefix)
1020{
1021 _LOG_DEBUG(">> QueryAdapter::prepareSegmentsByParams");
1022
1023 // the prepared_statement cannot improve the performance, but can simplify the code
1024 Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
1025 if (!conn) {
1026 // do not answer for this request due to lack of connections, request will come back later
1027 _LOG_DEBUG("No available database connections");
1028 return;
1029 }
1030 std::string getRecordNumSqlStr("SELECT count(name) FROM ");
1031 getRecordNumSqlStr += m_databaseTable;
1032 getRecordNumSqlStr += " WHERE ";
1033 for (size_t i = 0; i < m_nameFields.size(); i++) {
1034 getRecordNumSqlStr += m_nameFields[i];
1035 getRecordNumSqlStr += " LIKE ?";
1036 if (i != m_nameFields.size() - 1) {
1037 getRecordNumSqlStr += " AND ";
1038 }
Chengyu Fan92440162015-07-09 14:43:31 -06001039 }
1040
Chengyu Fan31737f12016-01-12 21:08:50 -07001041 PreparedStatement_T ps4RecordNum =
1042 Connection_prepareStatement(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());
1043
1044 // before query, initialize all params for statement
1045 for (size_t i = 0; i < m_nameFields.size(); i++) {
1046 PreparedStatement_setString(ps4RecordNum, i + 1, "%");
1047 }
1048
1049 // reset params based on the query
1050 for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
1051 it != queryParams.end(); ++it) {
1052 // dictionary is faster
1053 for (size_t i = 0; i < m_nameFields.size(); i++) {
1054 if (it->first == m_nameFields[i]) {
1055 PreparedStatement_setString(ps4RecordNum, i + 1, it->second.c_str());
1056 }
1057 }
1058 }
1059
1060 ResultSet_T res4RecordNum;
1061 TRY {
1062 res4RecordNum = PreparedStatement_executeQuery(ps4RecordNum);
1063 }
1064 CATCH(SQLException) {
1065 _LOG_ERROR(Connection_getLastError(conn));
1066 }
1067 END_TRY;
1068
1069 uint64_t resultCount = 0; // use count sql to get
1070
1071 // result for record number
1072 while (ResultSet_next(res4RecordNum)) {
1073 resultCount = ResultSet_getInt(res4RecordNum, 1);
1074 }
1075
1076 // get name list statement
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001077 std::string getNameListSqlStr("SELECT name, has_metadata FROM ");
Chengyu Fan31737f12016-01-12 21:08:50 -07001078 getNameListSqlStr += m_databaseTable;
1079 getNameListSqlStr += " WHERE ";
1080 for (size_t i = 0; i < m_nameFields.size(); i++) {
1081 getNameListSqlStr += m_nameFields[i];
1082 getNameListSqlStr += " LIKE ?";
1083 if (i != m_nameFields.size() - 1) {
1084 getNameListSqlStr += " AND ";
1085 }
1086 }
1087
1088 PreparedStatement_T ps4Name =
1089 Connection_prepareStatement(conn, reinterpret_cast<const char*>(getNameListSqlStr.c_str()), getNameListSqlStr.size());
1090
1091 // before query, initialize all params for statement
1092 for (size_t i = 0; i < m_nameFields.size(); i++) {
1093 PreparedStatement_setString(ps4Name, i + 1, "%");
1094 }
1095
1096 // reset params based on the query
1097 for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
1098 it != queryParams.end(); ++it) {
1099 // dictionary is faster
1100 for (size_t i = 0; i < m_nameFields.size(); i++) {
1101 if (it->first == m_nameFields[i]) {
1102 PreparedStatement_setString(ps4Name, i + 1, it->second.c_str());
1103 }
1104 }
1105 }
1106
1107 ResultSet_T res4Name;
1108 TRY {
1109 res4Name = PreparedStatement_executeQuery(ps4Name);
1110 }
1111 CATCH(SQLException) {
1112 _LOG_ERROR(Connection_getLastError(conn));
1113 }
1114 END_TRY;
1115
1116 generateSegments(res4Name, segmentPrefix, resultCount, false, false);
1117
1118 Connection_close(conn);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001119}
1120
1121template <typename DatabaseHandler>
1122void
Chengyu Fan31737f12016-01-12 21:08:50 -07001123QueryAdapter<DatabaseHandler>::generateSegments(ResultSet_T& res,
1124 const ndn::Name& segmentPrefix,
1125 int resultCount,
1126 bool autocomplete,
1127 bool lastComponent)
1128{
1129 uint64_t segmentno = 0;
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001130 Json::Value tmp, buf, resultjson;
Chengyu Fan31737f12016-01-12 21:08:50 -07001131 Json::FastWriter fastWriter;
1132
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001133 bool twoColumns = false;
1134 if (ResultSet_getColumnCount(res) > 1) {
1135 twoColumns = true;
1136 }
1137
Chengyu Fan31737f12016-01-12 21:08:50 -07001138 uint64_t viewstart = 0, viewend = 0;
1139 while (ResultSet_next(res)) {
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001140 tmp["name"] = ResultSet_getString(res, 1);
1141 if (twoColumns) {
1142 tmp["has_metadata"] = ResultSet_getInt(res, 2);
1143 } else {
1144 tmp["has_metadata"] = 0;
1145 }
1146 buf.append(tmp);
1147 const std::string tmpString = fastWriter.write(buf);
Chengyu Fan31737f12016-01-12 21:08:50 -07001148 if (tmpString.length() > PAYLOAD_LIMIT) {
1149 std::shared_ptr<ndn::Data> data
1150 = makeReplyData(segmentPrefix, resultjson, segmentno, false,
1151 autocomplete, resultCount, viewstart, viewend, lastComponent);
1152 m_mutex.lock();
1153 m_cache.insert(*data);
1154 m_face->put(*data);
1155 m_mutex.unlock();
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001156
1157 buf.clear();
Chengyu Fan31737f12016-01-12 21:08:50 -07001158 resultjson.clear();
1159 segmentno++;
1160 viewstart = viewend + 1;
1161 }
Chengyu Fan3ffd70e2016-06-27 16:49:27 -06001162 resultjson.append(tmp);
1163 buf = resultjson;
1164 tmp.clear();
Chengyu Fan31737f12016-01-12 21:08:50 -07001165 viewend++;
1166 }
1167 std::shared_ptr<ndn::Data> data
1168 = makeReplyData(segmentPrefix, resultjson, segmentno, true,
1169 autocomplete, resultCount, viewstart, viewend, lastComponent);
1170 m_mutex.lock();
1171 m_cache.insert(*data);
1172 m_face->put(*data);
1173 m_mutex.unlock();
1174}
1175
1176template <typename DatabaseHandler>
1177void
1178QueryAdapter<DatabaseHandler>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
1179 const std::string& sqlString,
1180 bool lastComponent,
1181 const std::string& nameField)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001182{
1183 // empty
1184}
1185
Chengyu Fan31737f12016-01-12 21:08:50 -07001186
1187template <>
Chengyu Fanb25835b2015-04-28 17:09:35 -06001188void
Chengyu Fan31737f12016-01-12 21:08:50 -07001189QueryAdapter<ConnectionPool_T>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
1190 const std::string& sqlString,
1191 bool lastComponent,
1192 const std::string& nameField)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001193{
Chengyu Fan31737f12016-01-12 21:08:50 -07001194 _LOG_DEBUG(">> QueryAdapter::prepareSegmentsBySqlString");
Chengyu Fancfb80c72015-10-19 16:50:04 -06001195
Chengyu Fan71b712b2015-09-09 22:13:56 -06001196 _LOG_DEBUG(sqlString);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001197
Chengyu Fan31737f12016-01-12 21:08:50 -07001198 Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
1199 if (!conn) {
1200 _LOG_DEBUG("No available database connections");
Chengyu Fanb25835b2015-04-28 17:09:35 -06001201 return;
1202 }
1203
Chengyu Fan31737f12016-01-12 21:08:50 -07001204 //// just for get the rwo count ...
1205 std::string getRecordNumSqlStr("SELECT COUNT( DISTINCT ");
1206 getRecordNumSqlStr += nameField;
1207 getRecordNumSqlStr += ") FROM ";
1208 getRecordNumSqlStr += m_databaseTable;
1209 getRecordNumSqlStr += sqlString;
Chengyu Fan92440162015-07-09 14:43:31 -06001210
Chengyu Fan31737f12016-01-12 21:08:50 -07001211 ResultSet_T res4RecordNum;
1212 TRY {
1213 res4RecordNum = Connection_executeQuery(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());
Chengyu Fanb25835b2015-04-28 17:09:35 -06001214 }
Chengyu Fan31737f12016-01-12 21:08:50 -07001215 CATCH(SQLException) {
1216 _LOG_ERROR(Connection_getLastError(conn));
1217 }
1218 END_TRY;
Chengyu Fan46398212015-08-11 11:23:13 -06001219
Chengyu Fan31737f12016-01-12 21:08:50 -07001220 uint64_t resultCount = 0;
1221 while (ResultSet_next(res4RecordNum)) {
1222 resultCount = ResultSet_getInt(res4RecordNum, 1);
1223 }
1224 ////
1225
1226 std::string getNextFieldsSqlStr("SELECT DISTINCT ");
1227 getNextFieldsSqlStr += nameField;
1228 getNextFieldsSqlStr += " FROM ";
1229 getNextFieldsSqlStr += m_databaseTable;
1230 getNextFieldsSqlStr += sqlString;
1231
1232 ResultSet_T res4NextFields;
1233 TRY {
1234 res4NextFields = Connection_executeQuery(conn, reinterpret_cast<const char*>(getNextFieldsSqlStr.c_str()), getNextFieldsSqlStr.size());
1235 }
1236 CATCH(SQLException) {
1237 _LOG_ERROR(Connection_getLastError(conn));
1238 }
1239 END_TRY;
1240
1241 generateSegments(res4NextFields, segmentPrefix, resultCount, true, lastComponent);
1242
1243 Connection_close(conn);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001244}
1245
1246template <typename DatabaseHandler>
1247std::shared_ptr<ndn::Data>
1248QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1249 const Json::Value& value,
1250 uint64_t segmentNo,
1251 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001252 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001253 uint64_t resultCount,
1254 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001255 uint64_t viewEnd,
1256 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001257{
1258 Json::Value entry;
1259 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001260
1261 entry["resultCount"] = Json::UInt64(resultCount);;
1262 entry["viewStart"] = Json::UInt64(viewStart);
1263 entry["viewEnd"] = Json::UInt64(viewEnd);
1264
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001265 if (lastComponent)
1266 entry["lastComponent"] = Json::Value(true);
1267
Chengyu Fan31737f12016-01-12 21:08:50 -07001268 _LOG_DEBUG("resultCount " << resultCount << "; "
1269 << "viewStart " << viewStart << "; "
Chengyu Fan71b712b2015-09-09 22:13:56 -06001270 << "viewEnd " << viewEnd);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001271
Chengyu Fanb25835b2015-04-28 17:09:35 -06001272 if (isAutocomplete) {
1273 entry["next"] = value;
1274 } else {
1275 entry["results"] = value;
1276 }
1277 const std::string jsonMessage = fastWriter.write(entry);
1278 const char* payload = jsonMessage.c_str();
1279 size_t payloadLength = jsonMessage.size() + 1;
1280 ndn::Name segmentName(segmentPrefix);
1281 segmentName.appendSegment(segmentNo);
1282
1283 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1284 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1285 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1286
1287 if (isFinalBlock) {
1288 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1289 }
Chengyu Fancfb80c72015-10-19 16:50:04 -06001290
Chengyu Fan71b712b2015-09-09 22:13:56 -06001291 _LOG_DEBUG(segmentName);
Chengyu Fancfb80c72015-10-19 16:50:04 -06001292
Chengyu Fanb25835b2015-04-28 17:09:35 -06001293 signData(*data);
1294 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001295}
1296
Chengyu Fancfb80c72015-10-19 16:50:04 -06001297
Alison Craig2a4d5282015-04-10 12:00:02 -06001298} // namespace query
1299} // namespace atmos
1300#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP