blob: 2b85da52a9ab520c2d0457cb8b12cfef41e81b6c [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060042
43#include "mysql/mysql.h"
44
Alison Craig2a4d5282015-04-10 12:00:02 -060045#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060046#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060047#include <memory>
48#include <mutex>
49#include <sstream>
50#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060051#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060052#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060053
54namespace atmos {
55namespace query {
Chengyu Fan92440162015-07-09 14:43:31 -060056// todo: calculate payload limit by get the size of a signed empty Data packet
57static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060058
59/**
60 * QueryAdapter handles the Query usecases for the catalog
61 */
62template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060063class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060064public:
65 /**
66 * Constructor
67 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060068 * @param face: Face that will be used for NDN communications
69 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060070 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060071 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
72 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060073
Alison Craig2a4d5282015-04-10 12:00:02 -060074 virtual
75 ~QueryAdapter();
76
77 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060078 * Helper function to specify section handler
79 */
80 void
81 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060082 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060083 const std::vector<std::string>& nameFields,
84 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060085
86protected:
87 /**
88 * Helper function for configuration parsing
89 */
90 void
91 onConfig(const util::ConfigSection& section,
92 bool isDryDun,
93 const std::string& fileName,
94 const ndn::Name& prefix);
95
96 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060097 * Handles incoming query requests by stripping the filter off the Interest to get the
98 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
99 *
100 * @param filter: InterestFilter that caused this Interest to be routed
101 * @param interest: Interest that needs to be handled
102 */
103 virtual void
104 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
105
106 /**
107 * Handles requests for responses to an existing query
108 *
109 * @param filter: InterestFilter that caused this Interest to be routed
110 * @param interest: Interest that needs to be handled
111 */
112 virtual void
113 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
114
Alison Craig2a4d5282015-04-10 12:00:02 -0600115 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600116 * Handles requests for responses to an filter initialization request
117 *
118 * @param filter: InterestFilter that caused this Interest to be routed
119 * @param interest: Interest that needs to be handled
120 */
121 virtual void
122 onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
123
124 /**
125 * Helper function that generates query results from a Json query carried in the Interest
126 *
127 * @param interest: Interest that needs to be handled
128 */
129 void
130 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
131
132 void
133 getFiltersMenu(Json::Value& value);
134
135 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600136 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600137 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600138 * @param segmentPrefix: Name that identifies the Prefix for the Data
139 * @param value: Json::Value to be sent in the Data
140 * @param segmentNo: uint64_t the segment for this Data
141 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
142 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600143 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600144 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 * @param viewStart: the start index of the record in the query results payload
146 * @param viewEnd: the end index of the record in the query results payload
Alison Craig2a4d5282015-04-10 12:00:02 -0600147 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600148 std::shared_ptr<ndn::Data>
149 makeReplyData(const ndn::Name& segmentPrefix,
150 const Json::Value& value,
151 uint64_t segmentNo,
152 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600153 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600154 uint64_t resultCount,
155 uint64_t viewStart,
156 uint64_t viewEnd);
Alison Craig2a4d5282015-04-10 12:00:02 -0600157
158 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600159 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600160 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600161 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600162 */
163 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600164 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600165
Alison Craig1aced7d2015-04-10 12:00:02 -0600166 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600167 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600168 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600169 * @param interest: Intersts that needs to be handled
170 * @param version: Version that needs to be in the data name
171 */
172 std::shared_ptr<ndn::Data>
173 makeAckData(std::shared_ptr<const ndn::Interest> interest,
174 const ndn::Name::Component& version);
175
176 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600177 * Helper function that sends NACK
178 *
179 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600180 */
181 void
Chengyu Fan92440162015-07-09 14:43:31 -0600182 sendNack(const ndn::Name& dataPrefix);
183
184 /**
185 * Helper function that generates the sqlQuery string for component-based query
186 * @param sqlQuery: stringstream to save the sqlQuery string
187 * @param jsonValue: Json value that contains the query information
188 */
189 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600190 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600191 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600192
193 /**
194 * Helper function that signs the data
195 */
196 void
197 signData(ndn::Data& data);
198
199 /**
200 * Helper function that publishes query-results data segments
201 */
202 virtual void
203 prepareSegments(const ndn::Name& segmentPrefix,
204 const std::string& sqlString,
205 bool autocomplete);
206
207 /**
208 * Helper function to set the DatabaseHandler
209 */
210 void
211 setDatabaseHandler(const util::ConnectionDetails& databaseId);
212
213 /**
214 * Helper function that set filters to make the adapter work
215 */
216 void
217 setFilters();
218
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600219 void
220 setCatalogId();
221
Chengyu Fan92440162015-07-09 14:43:31 -0600222 /**
223 * Helper function that generates the sqlQuery string for autocomplete query
224 * @param sqlQuery: stringstream to save the sqlQuery string
225 * @param jsonValue: Json value that contains the query information
226 */
227 bool
228 json2AutocompletionSql(std::stringstream& sqlQuery,
229 Json::Value& jsonValue);
230
Chengyu Fan46398212015-08-11 11:23:13 -0600231 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600232 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
233 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600234
235 ndn::Name
236 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
237 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600238
Chengyu Fanb25835b2015-04-28 17:09:35 -0600239protected:
240 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
241 // Handle to the Catalog's database
242 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600243
Alison Craig2a4d5282015-04-10 12:00:02 -0600244 // mutex to control critical sections
245 std::mutex m_mutex;
246 // @{ needs m_mutex protection
247 // The Queries we are currently writing to
248 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600249
250 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600251 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600252 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600253 //std::vector<std::string> m_atmosColumns;
254 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600255 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600256};
257
Alison Craig2a4d5282015-04-10 12:00:02 -0600258template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600259QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
260 const std::shared_ptr<ndn::KeyChain>& keyChain)
261 : util::CatalogAdapter(face, keyChain)
262 , m_cache(250000)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600263 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600264{
Alison Craig2a4d5282015-04-10 12:00:02 -0600265}
266
267template <typename DatabaseHandler>
268void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600269QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600270{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600271 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
272 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
273 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
274 this, _1, _2),
275 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
276 this, _1),
277 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
278 this, _1, _2));
279
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600280 ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
281 m_registeredPrefixList[queryResultsPrefix] =
282 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
283 .append("query-results").append(m_catalogId)),
Chengyu Fanb25835b2015-04-28 17:09:35 -0600284 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
285 this, _1, _2),
286 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
287 this, _1),
288 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
289 this, _1, _2));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600290
291 ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
292 m_registeredPrefixList[filtersInitializationPrefix] =
293 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
294 bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
295 this, _1, _2),
296 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
297 this, _1),
298 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
299 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600300}
301
302template <typename DatabaseHandler>
303void
304QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600305 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600306 const std::vector<std::string>& nameFields,
307 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600308{
Chengyu Fan92440162015-07-09 14:43:31 -0600309 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600310 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600311 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
312 _1, _2, _3, prefix));
313}
314
315template <typename DatabaseHandler>
316void
317QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
318 bool isDryRun,
319 const std::string& filename,
320 const ndn::Name& prefix)
321{
322 using namespace util;
323 if (isDryRun) {
324 return;
325 }
326 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
327 for (auto item = section.begin();
328 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600329 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600330 {
331 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600332 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600333 if (signingId.empty()) {
334 throw Error("Empty value for \"signingId\""
335 " in \"query\" section");
336 }
337 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600338 if (item->first == "filterCategoryNames") {
339 std::istringstream ss(item->second.get_value<std::string>());
340 std::string token;
341 while(std::getline(ss, token, ',')) {
342 m_filterCategoryNames.push_back(token);
343 }
344 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600345 if (item->first == "database") {
346 const util::ConfigSection& dataSection = item->second;
347 for (auto subItem = dataSection.begin();
348 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600349 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600350 {
351 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600352 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600353 if (dbServer.empty()){
354 throw Error("Invalid value for \"dbServer\""
355 " in \"query\" section");
356 }
357 }
358 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600359 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600360 if (dbName.empty()){
361 throw Error("Invalid value for \"dbName\""
362 " in \"query\" section");
363 }
364 }
365 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600366 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600367 if (dbUser.empty()){
368 throw Error("Invalid value for \"dbUser\""
369 " in \"query\" section");
370 }
371 }
372 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600373 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600374 if (dbPasswd.empty()){
375 throw Error("Invalid value for \"dbPasswd\""
376 " in \"query\" section");
377 }
378 }
379 }
380 }
381 }
382
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600383 if (m_filterCategoryNames.size() == 0) {
384 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
385 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600386
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600387 m_prefix = prefix;
388
389 m_signingId = ndn::Name(signingId);
390 setCatalogId();
391
392 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600393 setDatabaseHandler(mysqlId);
394 setFilters();
395}
396
397template <typename DatabaseHandler>
398void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600399QueryAdapter<DatabaseHandler>::setCatalogId()
400{
401 //empty
402}
403
404template <>
405void
406QueryAdapter<MYSQL>::setCatalogId()
407{
408 // use public key digest as the catalog ID
409 ndn::Name keyId;
410 if (m_signingId.empty()) {
411 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
412 } else {
413 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
414 }
415
416 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
417 ndn::Block keyDigest = pKey->computeDigest();
418 m_catalogId.clear();
419 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
420}
421
422template <typename DatabaseHandler>
423void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600424QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
425{
426 //empty
427}
428
429template <>
430void
431QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
432{
433 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
434
435 m_databaseHandler = conn;
436}
437
438template <typename DatabaseHandler>
439QueryAdapter<DatabaseHandler>::~QueryAdapter()
440{
441 for (const auto& itr : m_registeredPrefixList) {
442 if (static_cast<bool>(itr.second))
443 m_face->unsetInterestFilter(itr.second);
444 }
445}
446
447template <typename DatabaseHandler>
448void
449QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
450 const ndn::Interest& interest)
451{
452 // strictly enforce query initialization namespace.
453 // Name should be our local prefix + "query" + parameters
Alison Craig2a4d5282015-04-10 12:00:02 -0600454 if (interest.getName().size() != filter.getPrefix().size() + 1) {
455 // @todo: return a nack
456 return;
457 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600458 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600459
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600460#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600461 std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600462#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600463
Chengyu Fanb25835b2015-04-28 17:09:35 -0600464 // @todo: use thread pool
465 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
466 this,
467 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600468 queryThread.join();
469}
470
471template <typename DatabaseHandler>
472void
473QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
474 const ndn::Interest& interest)
475{
476 // FIXME Results are currently getting served out of the forwarder's
477 // CS so we just ignore any retrieval Interests that hit us for
478 // now. In the future, this should check some form of
479 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600480
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600481#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600482 std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600483#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600484
Alison Craig1aced7d2015-04-10 12:00:02 -0600485 auto data = m_cache.find(interest.getName());
486 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600487 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600488 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600489}
490
491template <typename DatabaseHandler>
492void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600493QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
494 const ndn::Interest& interest)
495{
496 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
497
498#ifndef NDEBUG
499 std::cout << "incoming initialization interest : " << interestPtr->getName() << std::endl;
500#endif
501 // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
502 // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
503
504 auto data = m_cache.find(interest.getName());
505 if (data) {
506 m_face->put(*data);
507 }
508 else {
509 std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
510 this,
511 interestPtr);
512 queryThread.join();
513 }
514}
515
516template <typename DatabaseHandler>
517void
518QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
519{
520 Json::Value filters;
521 Json::FastWriter fastWriter;
522 getFiltersMenu(filters);
523
524 const std::string filterValue = fastWriter.write(filters);
525
526 if (!filters.empty()) {
527 ndn::Name filterDataName(interest->getName());
528 filterDataName.append("stateVersion");// TODO: should replace with a state version
529
530 const char* payload = filterValue.c_str();
531 size_t payloadLength = filterValue.size();
532 size_t startIndex = 0, seqNo = 0;
533
534 if (filterValue.length() > PAYLOAD_LIMIT) {
535 payloadLength = PAYLOAD_LIMIT;
536 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
537 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
538 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
539 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
540
541 signData(*filterData);
542#ifndef NDEBUG
543 std::cout << "populate filter Data : " << segmentName << std::endl;
544#endif
545 m_mutex.lock();
546 m_cache.insert(*filterData);
547 try {
548 m_face->put(*filterData);
549 }// catch exceptions and log
550 catch (std::exception& e) {
551 std::cout << e.what() << std::endl;
552 }
553 m_mutex.unlock();
554
555 seqNo++;
556 startIndex = payloadLength * seqNo + 1;
557 }
558 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
559
560 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
561 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
562 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
563 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
564 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
565
566 signData(*filterData);
567 m_mutex.lock();
568 m_cache.insert(*filterData);
569 m_face->put(*filterData);
570 m_mutex.unlock();
571 }
572}
573
574template <typename DatabaseHandler>
575void
576QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
577{
578 // empty
579}
580
581// get distinct value of each column
582template <>
583void
584QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
585{
586 Json::Value tmp;
587
588 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
589 std::string columnName = m_filterCategoryNames[i];
590 std::string getFilterSql("SELECT DISTINCT " + columnName +
591 " FROM " + m_databaseTable + ";");
592 std::string errMsg;
593 bool success;
594
595 std::shared_ptr<MYSQL_RES> results
596 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
597 util::QUERY, success, errMsg);
598 if (!success) {
599 std::cout << errMsg << std::endl;
600 value.clear();
601 return;
602 }
603
604 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
605 {
606 tmp[columnName].append(row[0]);
607 }
608 value.append(tmp);
609 tmp.clear();
610 }
611
612#ifndef NDEBUG
613 std::cout << value.toStyledString() << std::endl;
614#endif
615}
616
617template <typename DatabaseHandler>
618void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600619QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600620{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600621 if (m_signingId.empty())
622 m_keyChain->sign(data);
623 else {
624 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
625 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
626 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600627 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600628}
629
630template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600631ndn::Name
632QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
633 const ndn::Name::Component& version)
634{
635 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
636 // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
637
638 ndn::Name queryResultName(m_prefix);
639 queryResultName.append("query-results")
640 .append(m_catalogId)
641 .append(interest->getName().get(-1))
642 .append(version);
643 return queryResultName;
644}
645
646template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600647std::shared_ptr<ndn::Data>
648QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
649 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600650{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600651 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600652
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600653 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
654 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
655 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600656 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
657
Chengyu Fanb25835b2015-04-28 17:09:35 -0600658 signData(*ack);
Chengyu Fan92440162015-07-09 14:43:31 -0600659#ifndef NDEBUG
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600660 std::cout << "qurey-results data name in ACK : " << queryResultNameStr << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -0600661#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -0600662 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600663}
664
665template <typename DatabaseHandler>
666void
Chengyu Fan92440162015-07-09 14:43:31 -0600667QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600668{
Chengyu Fan92440162015-07-09 14:43:31 -0600669 uint64_t segmentNo = 0;
670
671 std::shared_ptr<ndn::Data> nack =
672 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
673 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
674 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
675
676 signData(*nack);
Chengyu Fan46398212015-08-11 11:23:13 -0600677#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600678 std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600679#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600680 m_mutex.lock();
681 m_cache.insert(*nack);
682 m_mutex.unlock();
683}
684
685
686template <typename DatabaseHandler>
687bool
688QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
689 Json::Value& jsonValue)
690{
691#ifndef NDEBUG
692 std::cout << "jsonValue in json2Sql: " << jsonValue.toStyledString() << std::endl;
693#endif
694 if (jsonValue.type() != Json::objectValue) {
695 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
696 return false;
697 }
698
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600699 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600700 bool input = false;
701 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
702 {
703 Json::Value key = iter.key();
704 Json::Value value = (*iter);
705
Chengyu Fan92440162015-07-09 14:43:31 -0600706 if (key == Json::nullValue || value == Json::nullValue) {
707 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
708 return false;
709 }
710
711 // cannot convert to string
712 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
713 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
714 return false;
715 }
716
717 if (key.asString().compare("?") == 0) {
718 continue;
719 }
720
Chengyu Fanb25835b2015-04-28 17:09:35 -0600721 if (input) {
722 sqlQuery << " AND";
723 } else {
724 sqlQuery << " WHERE";
725 }
726
Chengyu Fan92440162015-07-09 14:43:31 -0600727 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600728 input = true;
729 }
730
731 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600732 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600733 }
734 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600735 return true;
736}
737
738template <typename DatabaseHandler>
739bool
740QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
741 Json::Value& jsonValue)
742{
743#ifndef NDEBUG
744 std::cout << "jsonValue in json2AutocompletionSql: " << jsonValue.toStyledString() << std::endl;
745#endif
746 if (jsonValue.type() != Json::objectValue) {
747 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
748 return false;
749 }
750
751 std::string typedString;
752 // get the string in the jsonValue
753 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
754 {
755 Json::Value key = iter.key();
756 Json::Value value = (*iter);
757
758 if (key == Json::nullValue || value == Json::nullValue) {
759 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
760 return false;
761 }
762
763 // cannot convert to string
764 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
765 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
766 return false;
767 }
768
769 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600770 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600771 // since the front end triggers the autocompletion when users typed '/',
772 // there must be a '/' at the end, and the first char must be '/'
773 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
774 return false;
775 break;
776 }
777 }
778
779 // 1. get the expected column number by parsing the typedString, so we can get the filed name
780 size_t pos = 0;
781 size_t start = 1; // start from the 1st char which is not '/'
782 size_t count = 0; // also the name to query for
783 std::string token;
784 std::string delimiter = "/";
785 std::map<std::string, std::string> typedComponents;
786 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
787 token = typedString.substr(start, pos - start);
788 if (count >= m_nameFields.size() - 1) {
789 return false;
790 }
791
792 // add column name and value (token) into map
793 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600794 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600795 start = pos + 1;
796 }
797
798 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
799 // return true
800 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600801 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600802 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
803 it != typedComponents.end(); ++it) {
804 if (more)
805 sqlQuery << " AND";
806 else
807 sqlQuery << " WHERE";
808
809 sqlQuery << " " << it->first << "='" << it->second << "'";
810
811 more = true;
812 }
813 sqlQuery << ";";
814 return true;
815}
816
817template <typename DatabaseHandler>
818bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600819QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
820 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600821{
822#ifndef NDEBUG
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600823 std::cout << "jsonValue in json2PrefixBasedSearchSql: " << jsonValue.toStyledString() << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600824#endif
825 if (jsonValue.type() != Json::objectValue) {
826 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
827 return false;
828 }
829
830 std::string typedString;
831 // get the string in the jsonValue
832 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
833 {
834 Json::Value key = iter.key();
835 Json::Value value = (*iter);
836
837 if (key == Json::nullValue || value == Json::nullValue) {
838 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
839 return false;
840 }
841
842 // cannot convert to string
843 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
844 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
845 return false;
846 }
847
848 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600849 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600850 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600851 return false;
852 break;
853 }
854 }
855
856 // 1. get the expected column number by parsing the typedString, so we can get the filed name
857 size_t pos = 0;
858 size_t start = 1; // start from the 1st char which is not '/'
859 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600860 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600861 std::string token;
862 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600863 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600864 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
865 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600866 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600867 return false;
868 }
869
870 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600871 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
872
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600873 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600874 start = pos + 1;
875 }
876
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600877 // we may have a component after the last "/"
878 if (start < typedStringLen) {
879 typedComponents.push_back(std::make_pair(m_nameFields[count],
880 typedString.substr(start, typedStringLen - start)));
881 }
882
Chengyu Fan46398212015-08-11 11:23:13 -0600883 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
884 // return true
885 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600886 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600887 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600888 it != typedComponents.end(); ++it) {
889 if (more)
890 sqlQuery << " AND";
891 else
892 sqlQuery << " WHERE";
893
894 sqlQuery << " " << it->first << "='" << it->second << "'";
895
896 more = true;
897 }
898 sqlQuery << ";";
899 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600900}
901
902template <typename DatabaseHandler>
903void
904QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600905{
Alison Craig1aced7d2015-04-10 12:00:02 -0600906 // 1) Strip the prefix off the ndn::Interest's ndn::Name
907 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600908
Chengyu Fanb25835b2015-04-28 17:09:35 -0600909 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
910 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
911 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600912
Chengyu Fanb25835b2015-04-28 17:09:35 -0600913 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600914 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600915 return;
916 }
Chengyu Fan92440162015-07-09 14:43:31 -0600917 // check if the ACK is cached, if yes, respond with ACK
918 // ?? what if the results for now it NULL, but latter exist?
Alison Craig2a4d5282015-04-10 12:00:02 -0600919 // For efficiency, do a double check. Once without the lock, then with it.
920 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
921 m_mutex.lock();
922 { // !!! BEGIN CRITICAL SECTION !!!
923 // If this fails upon locking, we removed it during our search.
924 // An unusual race-condition case, which requires things like PIT aggregation to be off.
925 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
926 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600927 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600928 m_mutex.unlock(); //escape lock
929 return;
930 }
931 } // !!! END CRITICAL SECTION !!!
932 m_mutex.unlock();
933 }
934
935 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
936 Json::Value parsedFromString;
937 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600938 if (!reader.parse(jsonQuery, parsedFromString)) {
939 // @todo: send NACK?
940 std::cout << "cannot parse the JsonQuery" << std::endl;
941 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600942 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600943
Chengyu Fan92440162015-07-09 14:43:31 -0600944 // the version should be replaced with ChronoSync state digest
Chengyu Fanb25835b2015-04-28 17:09:35 -0600945 const ndn::name::Component version
946 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
947 ndn::time::system_clock::now()).count());
948
949 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
950
951 m_mutex.lock();
952 { // !!! BEGIN CRITICAL SECTION !!!
953 // An unusual race-condition case, which requires things like PIT aggregation to be off.
954 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
955 if (iter != m_activeQueryToFirstResponse.end()) {
956 m_face->put(*(iter->second));
957 m_mutex.unlock(); // escape lock
958 return;
959 }
960 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -0600961 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
962 // that conatin the old version when ChronoSync is updated
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600963 //m_activeQueryToFirstResponse.insert(std::pair<std::string,
964 // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600965 m_face->put(*ack);
966 } // !!! END CRITICAL SECTION !!!
967 m_mutex.unlock();
968
969 // 3) Convert the JSON Query into a MySQL one
970 bool autocomplete = false;
971 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600972
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600973 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600974
Chengyu Fan92440162015-07-09 14:43:31 -0600975 Json::Value tmp;
976 // expect the autocomplete and the component-based query are separate
977 // if JSON::Value contains ? as key, is autocompletion
978 if (parsedFromString.get("?", tmp) != tmp) {
979 autocomplete = true;
980 if (!json2AutocompletionSql(sqlQuery, parsedFromString)) {
981 sendNack(segmentPrefix);
982 return;
983 }
984 }
Chengyu Fan46398212015-08-11 11:23:13 -0600985 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600986 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600987 sendNack(segmentPrefix);
988 return;
989 }
990 }
Chengyu Fan92440162015-07-09 14:43:31 -0600991 else {
992 if (!json2Sql(sqlQuery, parsedFromString)) {
993 sendNack(segmentPrefix);
994 return;
995 }
996 }
997
998 // 4) Run the Query
Chengyu Fanb25835b2015-04-28 17:09:35 -0600999 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
1000}
1001
1002template <typename DatabaseHandler>
1003void
1004QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1005 const std::string& sqlString,
1006 bool autocomplete)
1007{
1008 // empty
1009}
1010
1011// prepareSegments specilization function
1012template<>
1013void
1014QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1015 const std::string& sqlString,
1016 bool autocomplete)
1017{
Chengyu Fan46398212015-08-11 11:23:13 -06001018#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -06001019 std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -06001020#endif
1021 std::string errMsg;
1022 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001023 // 4) Run the Query
1024 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001025 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1026 if (!success)
1027 std::cout << errMsg << std::endl;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001028
1029 if (!results) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001030 std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -06001031
Chengyu Fanb25835b2015-04-28 17:09:35 -06001032 // @todo: throw runtime error or log the error message?
1033 return;
1034 }
1035
Chengyu Fan92440162015-07-09 14:43:31 -06001036 uint64_t resultCount = mysql_num_rows(results.get());
1037
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001038#ifndef NDEBUG
Chengyu Fanb25835b2015-04-28 17:09:35 -06001039 std::cout << "Query results for \""
1040 << sqlString
1041 << "\" contain "
Chengyu Fan92440162015-07-09 14:43:31 -06001042 << resultCount
Chengyu Fanb25835b2015-04-28 17:09:35 -06001043 << " rows" << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001044#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -06001045
1046 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001047 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001048 Json::Value tmp;
1049 Json::Value resultJson;
1050 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001051
1052 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001053 while ((row = mysql_fetch_row(results.get())))
1054 {
Chengyu Fan46398212015-08-11 11:23:13 -06001055 tmp.append(row[0]);
1056 const std::string tmpString = fastWriter.write(tmp);
1057 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001058 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001059 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
1060 autocomplete, resultCount, viewStart, viewEnd);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001061 m_mutex.lock();
1062 m_cache.insert(*data);
1063 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001064 tmp.clear();
1065 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001066 segmentNo++;
1067 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001068 }
Chengyu Fan46398212015-08-11 11:23:13 -06001069 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001070 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001071 }
Chengyu Fan46398212015-08-11 11:23:13 -06001072
Chengyu Fanb25835b2015-04-28 17:09:35 -06001073 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001074 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
1075 autocomplete, resultCount, viewStart, viewEnd);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001076 m_mutex.lock();
1077 m_cache.insert(*data);
1078 m_mutex.unlock();
1079}
1080
1081template <typename DatabaseHandler>
1082std::shared_ptr<ndn::Data>
1083QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1084 const Json::Value& value,
1085 uint64_t segmentNo,
1086 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001087 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001088 uint64_t resultCount,
1089 uint64_t viewStart,
1090 uint64_t viewEnd)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001091{
1092 Json::Value entry;
1093 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001094
1095 entry["resultCount"] = Json::UInt64(resultCount);;
1096 entry["viewStart"] = Json::UInt64(viewStart);
1097 entry["viewEnd"] = Json::UInt64(viewEnd);
1098
1099#ifndef NDEBUG
1100 std::cout << "resultCount " << resultCount
1101 << "; viewStart " << viewStart
1102 << "; viewEnd " << viewEnd << std::endl;
1103#endif
1104
Chengyu Fanb25835b2015-04-28 17:09:35 -06001105 if (isAutocomplete) {
1106 entry["next"] = value;
1107 } else {
1108 entry["results"] = value;
1109 }
1110 const std::string jsonMessage = fastWriter.write(entry);
1111 const char* payload = jsonMessage.c_str();
1112 size_t payloadLength = jsonMessage.size() + 1;
1113 ndn::Name segmentName(segmentPrefix);
1114 segmentName.appendSegment(segmentNo);
1115
1116 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1117 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1118 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1119
1120 if (isFinalBlock) {
1121 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1122 }
1123#ifndef NDEBUG
1124 std::cout << "makeReplyData : " << segmentName << std::endl;
1125#endif
1126 signData(*data);
1127 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001128}
1129
1130} // namespace query
1131} // namespace atmos
1132#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP