blob: 435ad603ea51165bc1e64c09670d45371fef5feb [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060042
43#include "mysql/mysql.h"
44
Alison Craig2a4d5282015-04-10 12:00:02 -060045#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060046#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060047#include <memory>
48#include <mutex>
49#include <sstream>
50#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060051#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060052#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060053
Chengyu Fan71b712b2015-09-09 22:13:56 -060054#include "util/logger.hpp"
55
56
Alison Craig2a4d5282015-04-10 12:00:02 -060057namespace atmos {
58namespace query {
Chengyu Fan71b712b2015-09-09 22:13:56 -060059#ifdef HAVE_LOG4CXX
60 INIT_LOGGER("QueryAdapter");
61#endif
62
Chengyu Fan92440162015-07-09 14:43:31 -060063// todo: calculate payload limit by get the size of a signed empty Data packet
64static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060065
66/**
67 * QueryAdapter handles the Query usecases for the catalog
68 */
69template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060070class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060071public:
72 /**
73 * Constructor
74 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060075 * @param face: Face that will be used for NDN communications
76 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060077 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060078 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
79 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060080
Alison Craig2a4d5282015-04-10 12:00:02 -060081 virtual
82 ~QueryAdapter();
83
84 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060085 * Helper function to specify section handler
86 */
87 void
88 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060089 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060090 const std::vector<std::string>& nameFields,
91 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060092
93protected:
94 /**
95 * Helper function for configuration parsing
96 */
97 void
98 onConfig(const util::ConfigSection& section,
99 bool isDryDun,
100 const std::string& fileName,
101 const ndn::Name& prefix);
102
103 /**
Alison Craig2a4d5282015-04-10 12:00:02 -0600104 * Handles incoming query requests by stripping the filter off the Interest to get the
105 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
106 *
107 * @param filter: InterestFilter that caused this Interest to be routed
108 * @param interest: Interest that needs to be handled
109 */
110 virtual void
111 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
112
113 /**
114 * Handles requests for responses to an existing query
115 *
116 * @param filter: InterestFilter that caused this Interest to be routed
117 * @param interest: Interest that needs to be handled
118 */
119 virtual void
120 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
121
Alison Craig2a4d5282015-04-10 12:00:02 -0600122 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600123 * Handles requests for responses to an filter initialization request
124 *
125 * @param filter: InterestFilter that caused this Interest to be routed
126 * @param interest: Interest that needs to be handled
127 */
128 virtual void
129 onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
130
131 /**
132 * Helper function that generates query results from a Json query carried in the Interest
133 *
134 * @param interest: Interest that needs to be handled
135 */
136 void
137 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
138
139 void
140 getFiltersMenu(Json::Value& value);
141
142 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600143 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600144 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600145 * @param segmentPrefix: Name that identifies the Prefix for the Data
146 * @param value: Json::Value to be sent in the Data
147 * @param segmentNo: uint64_t the segment for this Data
148 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
149 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600150 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600151 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600152 * @param viewStart: the start index of the record in the query results payload
153 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600154 * @param lastComponent: flag to indicate the content contains the last component for
155 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600156 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600157 std::shared_ptr<ndn::Data>
158 makeReplyData(const ndn::Name& segmentPrefix,
159 const Json::Value& value,
160 uint64_t segmentNo,
161 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600162 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600163 uint64_t resultCount,
164 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600165 uint64_t viewEnd,
166 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600167
168 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600169 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600170 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600171 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600172 */
173 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600174 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600175
Alison Craig1aced7d2015-04-10 12:00:02 -0600176 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600177 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600178 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600179 * @param interest: Intersts that needs to be handled
180 * @param version: Version that needs to be in the data name
181 */
182 std::shared_ptr<ndn::Data>
183 makeAckData(std::shared_ptr<const ndn::Interest> interest,
184 const ndn::Name::Component& version);
185
186 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600187 * Helper function that sends NACK
188 *
189 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600190 */
191 void
Chengyu Fan92440162015-07-09 14:43:31 -0600192 sendNack(const ndn::Name& dataPrefix);
193
194 /**
195 * Helper function that generates the sqlQuery string for component-based query
196 * @param sqlQuery: stringstream to save the sqlQuery string
197 * @param jsonValue: Json value that contains the query information
198 */
199 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600200 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600201 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600202
203 /**
204 * Helper function that signs the data
205 */
206 void
207 signData(ndn::Data& data);
208
209 /**
210 * Helper function that publishes query-results data segments
211 */
212 virtual void
213 prepareSegments(const ndn::Name& segmentPrefix,
214 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600215 bool autocomplete,
216 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600217
218 /**
219 * Helper function to set the DatabaseHandler
220 */
221 void
222 setDatabaseHandler(const util::ConnectionDetails& databaseId);
223
224 /**
225 * Helper function that set filters to make the adapter work
226 */
227 void
228 setFilters();
229
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600230 void
231 setCatalogId();
232
Chengyu Fan92440162015-07-09 14:43:31 -0600233 /**
234 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600235 * @param sqlQuery: stringstream to save the sqlQuery string
236 * @param jsonValue: Json value that contains the query information
237 * @param lastComponent: Flag to mark the last component query
Chengyu Fan92440162015-07-09 14:43:31 -0600238 */
239 bool
240 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600241 Json::Value& jsonValue,
242 bool& lastComponent);
Chengyu Fan92440162015-07-09 14:43:31 -0600243
Chengyu Fan46398212015-08-11 11:23:13 -0600244 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600245 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
246 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600247
248 ndn::Name
249 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
250 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600251
Chengyu Fanb25835b2015-04-28 17:09:35 -0600252protected:
253 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
254 // Handle to the Catalog's database
255 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600256
Alison Craig2a4d5282015-04-10 12:00:02 -0600257 // mutex to control critical sections
258 std::mutex m_mutex;
259 // @{ needs m_mutex protection
260 // The Queries we are currently writing to
261 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600262
263 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600264 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600265 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600266 //std::vector<std::string> m_atmosColumns;
267 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600268 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600269};
270
Alison Craig2a4d5282015-04-10 12:00:02 -0600271template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600272QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
273 const std::shared_ptr<ndn::KeyChain>& keyChain)
274 : util::CatalogAdapter(face, keyChain)
275 , m_cache(250000)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600276 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600277{
Alison Craig2a4d5282015-04-10 12:00:02 -0600278}
279
280template <typename DatabaseHandler>
281void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600282QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600283{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600284 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
285 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
286 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
287 this, _1, _2),
288 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
289 this, _1),
290 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
291 this, _1, _2));
292
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600293 ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
294 m_registeredPrefixList[queryResultsPrefix] =
295 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
296 .append("query-results").append(m_catalogId)),
Chengyu Fanb25835b2015-04-28 17:09:35 -0600297 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
298 this, _1, _2),
299 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
300 this, _1),
301 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
302 this, _1, _2));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600303
304 ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
305 m_registeredPrefixList[filtersInitializationPrefix] =
306 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
307 bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
308 this, _1, _2),
309 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
310 this, _1),
311 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
312 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600313}
314
315template <typename DatabaseHandler>
316void
317QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600318 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600319 const std::vector<std::string>& nameFields,
320 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600321{
Chengyu Fan92440162015-07-09 14:43:31 -0600322 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600323 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600324 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
325 _1, _2, _3, prefix));
326}
327
328template <typename DatabaseHandler>
329void
330QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
331 bool isDryRun,
332 const std::string& filename,
333 const ndn::Name& prefix)
334{
335 using namespace util;
336 if (isDryRun) {
337 return;
338 }
339 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
340 for (auto item = section.begin();
341 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600342 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600343 {
344 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600345 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600346 if (signingId.empty()) {
347 throw Error("Empty value for \"signingId\""
348 " in \"query\" section");
349 }
350 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600351 if (item->first == "filterCategoryNames") {
352 std::istringstream ss(item->second.get_value<std::string>());
353 std::string token;
354 while(std::getline(ss, token, ',')) {
355 m_filterCategoryNames.push_back(token);
356 }
357 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600358 if (item->first == "database") {
359 const util::ConfigSection& dataSection = item->second;
360 for (auto subItem = dataSection.begin();
361 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600362 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600363 {
364 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600365 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600366 if (dbServer.empty()){
367 throw Error("Invalid value for \"dbServer\""
368 " in \"query\" section");
369 }
370 }
371 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600372 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373 if (dbName.empty()){
374 throw Error("Invalid value for \"dbName\""
375 " in \"query\" section");
376 }
377 }
378 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600379 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600380 if (dbUser.empty()){
381 throw Error("Invalid value for \"dbUser\""
382 " in \"query\" section");
383 }
384 }
385 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600386 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600387 if (dbPasswd.empty()){
388 throw Error("Invalid value for \"dbPasswd\""
389 " in \"query\" section");
390 }
391 }
392 }
393 }
394 }
395
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600396 if (m_filterCategoryNames.size() == 0) {
397 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
398 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600399
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600400 m_prefix = prefix;
401
402 m_signingId = ndn::Name(signingId);
403 setCatalogId();
404
405 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600406 setDatabaseHandler(mysqlId);
407 setFilters();
408}
409
410template <typename DatabaseHandler>
411void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600412QueryAdapter<DatabaseHandler>::setCatalogId()
413{
414 //empty
415}
416
417template <>
418void
419QueryAdapter<MYSQL>::setCatalogId()
420{
421 // use public key digest as the catalog ID
422 ndn::Name keyId;
423 if (m_signingId.empty()) {
424 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
425 } else {
426 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
427 }
428
429 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
430 ndn::Block keyDigest = pKey->computeDigest();
431 m_catalogId.clear();
432 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
433}
434
435template <typename DatabaseHandler>
436void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600437QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
438{
439 //empty
440}
441
442template <>
443void
444QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
445{
446 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
447
448 m_databaseHandler = conn;
449}
450
451template <typename DatabaseHandler>
452QueryAdapter<DatabaseHandler>::~QueryAdapter()
453{
454 for (const auto& itr : m_registeredPrefixList) {
455 if (static_cast<bool>(itr.second))
456 m_face->unsetInterestFilter(itr.second);
457 }
458}
459
460template <typename DatabaseHandler>
461void
462QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
463 const ndn::Interest& interest)
464{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600465 _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
466
Alison Craig2a4d5282015-04-10 12:00:02 -0600467 if (interest.getName().size() != filter.getPrefix().size() + 1) {
468 // @todo: return a nack
469 return;
470 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600471 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600472
Chengyu Fan71b712b2015-09-09 22:13:56 -0600473 _LOG_DEBUG("Interest : " << interestPtr->getName());
Chengyu Fan92440162015-07-09 14:43:31 -0600474
Chengyu Fanb25835b2015-04-28 17:09:35 -0600475 // @todo: use thread pool
476 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
477 this,
478 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600479 queryThread.join();
480}
481
482template <typename DatabaseHandler>
483void
484QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
485 const ndn::Interest& interest)
486{
487 // FIXME Results are currently getting served out of the forwarder's
488 // CS so we just ignore any retrieval Interests that hit us for
489 // now. In the future, this should check some form of
490 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600491
Chengyu Fan71b712b2015-09-09 22:13:56 -0600492 _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
Chengyu Fan92440162015-07-09 14:43:31 -0600493
Alison Craig1aced7d2015-04-10 12:00:02 -0600494 auto data = m_cache.find(interest.getName());
495 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600496 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600497 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600498
499 _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
Alison Craig2a4d5282015-04-10 12:00:02 -0600500}
501
502template <typename DatabaseHandler>
503void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600504QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
505 const ndn::Interest& interest)
506{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600507 _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600508 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
509
Chengyu Fan71b712b2015-09-09 22:13:56 -0600510 _LOG_DEBUG("Interest : " << interestPtr->getName());
511
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600512 // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
513 // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
514
515 auto data = m_cache.find(interest.getName());
516 if (data) {
517 m_face->put(*data);
518 }
519 else {
520 std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
521 this,
522 interestPtr);
523 queryThread.join();
524 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600525
526 _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600527}
528
529template <typename DatabaseHandler>
530void
531QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
532{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600533 _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600534 Json::Value filters;
535 Json::FastWriter fastWriter;
536 getFiltersMenu(filters);
537
538 const std::string filterValue = fastWriter.write(filters);
539
540 if (!filters.empty()) {
541 ndn::Name filterDataName(interest->getName());
542 filterDataName.append("stateVersion");// TODO: should replace with a state version
543
544 const char* payload = filterValue.c_str();
545 size_t payloadLength = filterValue.size();
546 size_t startIndex = 0, seqNo = 0;
547
548 if (filterValue.length() > PAYLOAD_LIMIT) {
549 payloadLength = PAYLOAD_LIMIT;
550 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
551 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
552 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
553 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
554
555 signData(*filterData);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600556
557 _LOG_DEBUG("Populate Filter Data :" << segmentName);
558
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600559 m_mutex.lock();
560 m_cache.insert(*filterData);
561 try {
562 m_face->put(*filterData);
563 }// catch exceptions and log
564 catch (std::exception& e) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600565 _LOG_DEBUG(e.what());
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600566 }
567 m_mutex.unlock();
568
569 seqNo++;
570 startIndex = payloadLength * seqNo + 1;
571 }
572 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
573
574 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
575 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
576 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
577 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
578 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
579
580 signData(*filterData);
581 m_mutex.lock();
582 m_cache.insert(*filterData);
583 m_face->put(*filterData);
584 m_mutex.unlock();
585 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600586 _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600587}
588
589template <typename DatabaseHandler>
590void
591QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
592{
593 // empty
594}
595
596// get distinct value of each column
597template <>
598void
599QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
600{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600601 _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600602 Json::Value tmp;
603
604 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
605 std::string columnName = m_filterCategoryNames[i];
606 std::string getFilterSql("SELECT DISTINCT " + columnName +
607 " FROM " + m_databaseTable + ";");
608 std::string errMsg;
609 bool success;
610
611 std::shared_ptr<MYSQL_RES> results
612 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
613 util::QUERY, success, errMsg);
614 if (!success) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600615 _LOG_DEBUG(errMsg);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600616 value.clear();
617 return;
618 }
619
620 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
621 {
622 tmp[columnName].append(row[0]);
623 }
624 value.append(tmp);
625 tmp.clear();
626 }
627
Chengyu Fan71b712b2015-09-09 22:13:56 -0600628 _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600629}
630
631template <typename DatabaseHandler>
632void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600633QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600634{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600635 if (m_signingId.empty())
636 m_keyChain->sign(data);
637 else {
638 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
639 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
640 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600641 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600642}
643
644template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600645ndn::Name
646QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
647 const ndn::Name::Component& version)
648{
649 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
650 // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
651
652 ndn::Name queryResultName(m_prefix);
653 queryResultName.append("query-results")
654 .append(m_catalogId)
655 .append(interest->getName().get(-1))
656 .append(version);
657 return queryResultName;
658}
659
660template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600661std::shared_ptr<ndn::Data>
662QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
663 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600664{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600665 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600666
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600667 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
668 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
669 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600670 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
671
Chengyu Fanb25835b2015-04-28 17:09:35 -0600672 signData(*ack);
Chengyu Fan92440162015-07-09 14:43:31 -0600673#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -0600674 _LOG_DEBUG(queryResultNameStr);
Chengyu Fan92440162015-07-09 14:43:31 -0600675#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -0600676 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600677}
678
679template <typename DatabaseHandler>
680void
Chengyu Fan92440162015-07-09 14:43:31 -0600681QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600682{
Chengyu Fan92440162015-07-09 14:43:31 -0600683 uint64_t segmentNo = 0;
684
685 std::shared_ptr<ndn::Data> nack =
686 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
687 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
688 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
689
690 signData(*nack);
Chengyu Fan46398212015-08-11 11:23:13 -0600691#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -0600692 _LOG_DEBUG(ndn::Name(dataPrefix).appendSegment(segmentNo));
Chengyu Fan46398212015-08-11 11:23:13 -0600693#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600694 m_mutex.lock();
695 m_cache.insert(*nack);
696 m_mutex.unlock();
697}
698
699
700template <typename DatabaseHandler>
701bool
702QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
703 Json::Value& jsonValue)
704{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600705 _LOG_DEBUG(">> QueryAdapter::json2Sql");
Chengyu Fan92440162015-07-09 14:43:31 -0600706#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -0600707 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fan92440162015-07-09 14:43:31 -0600708#endif
709 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600710 return false;
711 }
712
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600713 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600714 bool input = false;
715 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
716 {
717 Json::Value key = iter.key();
718 Json::Value value = (*iter);
719
Chengyu Fan92440162015-07-09 14:43:31 -0600720 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600721 _LOG_DEBUG("null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600722 return false;
723 }
724
725 // cannot convert to string
726 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600727 _LOG_DEBUG("malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600728 return false;
729 }
730
731 if (key.asString().compare("?") == 0) {
732 continue;
733 }
734
Chengyu Fanb25835b2015-04-28 17:09:35 -0600735 if (input) {
736 sqlQuery << " AND";
737 } else {
738 sqlQuery << " WHERE";
739 }
740
Chengyu Fan92440162015-07-09 14:43:31 -0600741 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600742 input = true;
743 }
744
745 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600746 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600747 }
748 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600749 return true;
750}
751
752template <typename DatabaseHandler>
753bool
754QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600755 Json::Value& jsonValue,
756 bool& lastComponent)
Chengyu Fan92440162015-07-09 14:43:31 -0600757{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600758 _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
Chengyu Fan92440162015-07-09 14:43:31 -0600759#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -0600760 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fan92440162015-07-09 14:43:31 -0600761#endif
762 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan92440162015-07-09 14:43:31 -0600763 return false;
764 }
765
766 std::string typedString;
767 // get the string in the jsonValue
768 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
769 {
770 Json::Value key = iter.key();
771 Json::Value value = (*iter);
772
773 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600774 _LOG_DEBUG("null key or value in JsonValue");
Chengyu Fan92440162015-07-09 14:43:31 -0600775 return false;
776 }
777
778 // cannot convert to string
779 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600780 _LOG_DEBUG("malformed JsonQuery string");
Chengyu Fan92440162015-07-09 14:43:31 -0600781 return false;
782 }
783
784 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600785 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600786 // since the front end triggers the autocompletion when users typed '/',
787 // there must be a '/' at the end, and the first char must be '/'
788 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
789 return false;
790 break;
791 }
792 }
793
794 // 1. get the expected column number by parsing the typedString, so we can get the filed name
795 size_t pos = 0;
796 size_t start = 1; // start from the 1st char which is not '/'
797 size_t count = 0; // also the name to query for
798 std::string token;
799 std::string delimiter = "/";
800 std::map<std::string, std::string> typedComponents;
801 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
802 token = typedString.substr(start, pos - start);
803 if (count >= m_nameFields.size() - 1) {
804 return false;
805 }
806
807 // add column name and value (token) into map
808 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600809 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600810 start = pos + 1;
811 }
812
813 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
814 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600815 if (count == m_nameFields.size() - 1)
816 lastComponent = true; // indicate this query is to query the last component
817
Chengyu Fan92440162015-07-09 14:43:31 -0600818 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600819 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600820 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
821 it != typedComponents.end(); ++it) {
822 if (more)
823 sqlQuery << " AND";
824 else
825 sqlQuery << " WHERE";
826
827 sqlQuery << " " << it->first << "='" << it->second << "'";
828
829 more = true;
830 }
831 sqlQuery << ";";
832 return true;
833}
834
835template <typename DatabaseHandler>
836bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600837QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
838 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600839{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600840 _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
Chengyu Fan46398212015-08-11 11:23:13 -0600841#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -0600842 _LOG_DEBUG(jsonValue.toStyledString());
Chengyu Fan46398212015-08-11 11:23:13 -0600843#endif
844 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600845 return false;
846 }
847
848 std::string typedString;
849 // get the string in the jsonValue
850 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
851 {
852 Json::Value key = iter.key();
853 Json::Value value = (*iter);
854
855 if (key == Json::nullValue || value == Json::nullValue) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600856 _LOG_DEBUG("null key or value in JsonValue");
Chengyu Fan46398212015-08-11 11:23:13 -0600857 return false;
858 }
859
860 // cannot convert to string
861 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600862 _LOG_DEBUG("malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600863 return false;
864 }
865
866 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600867 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600868 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600869 return false;
870 break;
871 }
872 }
873
874 // 1. get the expected column number by parsing the typedString, so we can get the filed name
875 size_t pos = 0;
876 size_t start = 1; // start from the 1st char which is not '/'
877 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600878 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600879 std::string token;
880 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600881 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600882 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
883 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600884 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600885 return false;
886 }
887
888 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600889 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
890
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600891 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600892 start = pos + 1;
893 }
894
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600895 // we may have a component after the last "/"
896 if (start < typedStringLen) {
897 typedComponents.push_back(std::make_pair(m_nameFields[count],
898 typedString.substr(start, typedStringLen - start)));
899 }
900
Chengyu Fan46398212015-08-11 11:23:13 -0600901 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
902 // return true
903 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600904 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600905 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600906 it != typedComponents.end(); ++it) {
907 if (more)
908 sqlQuery << " AND";
909 else
910 sqlQuery << " WHERE";
911
912 sqlQuery << " " << it->first << "='" << it->second << "'";
913
914 more = true;
915 }
916 sqlQuery << ";";
917 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600918}
919
920template <typename DatabaseHandler>
921void
922QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600923{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600924 _LOG_DEBUG(">> QueryAdapter::runJsonQuery");
925
Alison Craig1aced7d2015-04-10 12:00:02 -0600926 // 1) Strip the prefix off the ndn::Interest's ndn::Name
927 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600928
Chengyu Fanb25835b2015-04-28 17:09:35 -0600929 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
930 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
931 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600932
Chengyu Fanb25835b2015-04-28 17:09:35 -0600933 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600934 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600935 return;
936 }
Chengyu Fan92440162015-07-09 14:43:31 -0600937 // check if the ACK is cached, if yes, respond with ACK
938 // ?? what if the results for now it NULL, but latter exist?
Alison Craig2a4d5282015-04-10 12:00:02 -0600939 // For efficiency, do a double check. Once without the lock, then with it.
940 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
941 m_mutex.lock();
942 { // !!! BEGIN CRITICAL SECTION !!!
943 // If this fails upon locking, we removed it during our search.
944 // An unusual race-condition case, which requires things like PIT aggregation to be off.
945 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
946 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600947 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600948 m_mutex.unlock(); //escape lock
949 return;
950 }
951 } // !!! END CRITICAL SECTION !!!
952 m_mutex.unlock();
953 }
954
955 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
956 Json::Value parsedFromString;
957 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600958 if (!reader.parse(jsonQuery, parsedFromString)) {
959 // @todo: send NACK?
Chengyu Fan71b712b2015-09-09 22:13:56 -0600960 _LOG_DEBUG("cannot parse the JsonQuery");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600961 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600962 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600963
Chengyu Fan92440162015-07-09 14:43:31 -0600964 // the version should be replaced with ChronoSync state digest
Chengyu Fanb25835b2015-04-28 17:09:35 -0600965 const ndn::name::Component version
966 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
967 ndn::time::system_clock::now()).count());
968
969 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
970
971 m_mutex.lock();
972 { // !!! BEGIN CRITICAL SECTION !!!
973 // An unusual race-condition case, which requires things like PIT aggregation to be off.
974 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
975 if (iter != m_activeQueryToFirstResponse.end()) {
976 m_face->put(*(iter->second));
977 m_mutex.unlock(); // escape lock
978 return;
979 }
980 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -0600981 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
982 // that conatin the old version when ChronoSync is updated
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600983 //m_activeQueryToFirstResponse.insert(std::pair<std::string,
984 // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600985 m_face->put(*ack);
986 } // !!! END CRITICAL SECTION !!!
987 m_mutex.unlock();
988
989 // 3) Convert the JSON Query into a MySQL one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600990 bool autocomplete = false, lastComponent = false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600991 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600992
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600993 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600994
Chengyu Fan92440162015-07-09 14:43:31 -0600995 Json::Value tmp;
996 // expect the autocomplete and the component-based query are separate
997 // if JSON::Value contains ? as key, is autocompletion
998 if (parsedFromString.get("?", tmp) != tmp) {
999 autocomplete = true;
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001000 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
Chengyu Fan92440162015-07-09 14:43:31 -06001001 sendNack(segmentPrefix);
1002 return;
1003 }
1004 }
Chengyu Fan46398212015-08-11 11:23:13 -06001005 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -06001006 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -06001007 sendNack(segmentPrefix);
1008 return;
1009 }
1010 }
Chengyu Fan92440162015-07-09 14:43:31 -06001011 else {
1012 if (!json2Sql(sqlQuery, parsedFromString)) {
1013 sendNack(segmentPrefix);
1014 return;
1015 }
1016 }
1017
1018 // 4) Run the Query
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001019 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001020}
1021
1022template <typename DatabaseHandler>
1023void
1024QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1025 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001026 bool autocomplete,
1027 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001028{
1029 // empty
1030}
1031
1032// prepareSegments specilization function
1033template<>
1034void
1035QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1036 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001037 bool autocomplete,
1038 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001039{
Chengyu Fan71b712b2015-09-09 22:13:56 -06001040 _LOG_DEBUG(">> QueryAdapter::prepareSegments");
Chengyu Fan46398212015-08-11 11:23:13 -06001041#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -06001042 _LOG_DEBUG(sqlString);
Chengyu Fan46398212015-08-11 11:23:13 -06001043#endif
1044 std::string errMsg;
1045 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001046 // 4) Run the Query
1047 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001048 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1049 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -06001050 _LOG_DEBUG(errMsg);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001051
1052 if (!results) {
Chengyu Fan71b712b2015-09-09 22:13:56 -06001053 _LOG_DEBUG("NULL MYSQL_RES for" << sqlString);
Chengyu Fan92440162015-07-09 14:43:31 -06001054
Chengyu Fanb25835b2015-04-28 17:09:35 -06001055 // @todo: throw runtime error or log the error message?
1056 return;
1057 }
1058
Chengyu Fan92440162015-07-09 14:43:31 -06001059 uint64_t resultCount = mysql_num_rows(results.get());
1060
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001061#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -06001062 _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001063#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -06001064
1065 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001066 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001067 Json::Value tmp;
1068 Json::Value resultJson;
1069 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001070
1071 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001072 while ((row = mysql_fetch_row(results.get())))
1073 {
Chengyu Fan46398212015-08-11 11:23:13 -06001074 tmp.append(row[0]);
1075 const std::string tmpString = fastWriter.write(tmp);
1076 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001077 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001078 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001079 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001080 m_mutex.lock();
1081 m_cache.insert(*data);
1082 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001083 tmp.clear();
1084 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001085 segmentNo++;
1086 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001087 }
Chengyu Fan46398212015-08-11 11:23:13 -06001088 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001089 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001090 }
Chengyu Fan46398212015-08-11 11:23:13 -06001091
Chengyu Fanb25835b2015-04-28 17:09:35 -06001092 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001093 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001094 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001095 m_mutex.lock();
1096 m_cache.insert(*data);
1097 m_mutex.unlock();
1098}
1099
1100template <typename DatabaseHandler>
1101std::shared_ptr<ndn::Data>
1102QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1103 const Json::Value& value,
1104 uint64_t segmentNo,
1105 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001106 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001107 uint64_t resultCount,
1108 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001109 uint64_t viewEnd,
1110 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001111{
1112 Json::Value entry;
1113 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001114
1115 entry["resultCount"] = Json::UInt64(resultCount);;
1116 entry["viewStart"] = Json::UInt64(viewStart);
1117 entry["viewEnd"] = Json::UInt64(viewEnd);
1118
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001119 if (lastComponent)
1120 entry["lastComponent"] = Json::Value(true);
1121
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001122#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -06001123 _LOG_DEBUG("resultCount " << resultCount << ";"
1124 << "viewStart " << viewStart << ";"
1125 << "viewEnd " << viewEnd);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001126#endif
1127
Chengyu Fanb25835b2015-04-28 17:09:35 -06001128 if (isAutocomplete) {
1129 entry["next"] = value;
1130 } else {
1131 entry["results"] = value;
1132 }
1133 const std::string jsonMessage = fastWriter.write(entry);
1134 const char* payload = jsonMessage.c_str();
1135 size_t payloadLength = jsonMessage.size() + 1;
1136 ndn::Name segmentName(segmentPrefix);
1137 segmentName.appendSegment(segmentNo);
1138
1139 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1140 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1141 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1142
1143 if (isFinalBlock) {
1144 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1145 }
1146#ifndef NDEBUG
Chengyu Fan71b712b2015-09-09 22:13:56 -06001147 _LOG_DEBUG(segmentName);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001148#endif
1149 signData(*data);
1150 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001151}
1152
1153} // namespace query
1154} // namespace atmos
1155#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP