blob: 0a5ae0bc431bafaaea25c28080b5ff05480fefd0 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060041#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060042
43#include "mysql/mysql.h"
44
Alison Craig2a4d5282015-04-10 12:00:02 -060045#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060046#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060047#include <memory>
48#include <mutex>
49#include <sstream>
50#include <string>
Chengyu Fan92440162015-07-09 14:43:31 -060051#include <array>
Chengyu Fan4d5fbd22015-09-18 14:34:08 -060052#include <utility>
Alison Craig2a4d5282015-04-10 12:00:02 -060053
54namespace atmos {
55namespace query {
Chengyu Fan92440162015-07-09 14:43:31 -060056// todo: calculate payload limit by get the size of a signed empty Data packet
57static const size_t PAYLOAD_LIMIT = 7000;
Alison Craig2a4d5282015-04-10 12:00:02 -060058
59/**
60 * QueryAdapter handles the Query usecases for the catalog
61 */
62template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060063class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060064public:
65 /**
66 * Constructor
67 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060068 * @param face: Face that will be used for NDN communications
69 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060070 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060071 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
72 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060073
Alison Craig2a4d5282015-04-10 12:00:02 -060074 virtual
75 ~QueryAdapter();
76
77 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060078 * Helper function to specify section handler
79 */
80 void
81 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060082 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060083 const std::vector<std::string>& nameFields,
84 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060085
86protected:
87 /**
88 * Helper function for configuration parsing
89 */
90 void
91 onConfig(const util::ConfigSection& section,
92 bool isDryDun,
93 const std::string& fileName,
94 const ndn::Name& prefix);
95
96 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060097 * Handles incoming query requests by stripping the filter off the Interest to get the
98 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
99 *
100 * @param filter: InterestFilter that caused this Interest to be routed
101 * @param interest: Interest that needs to be handled
102 */
103 virtual void
104 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
105
106 /**
107 * Handles requests for responses to an existing query
108 *
109 * @param filter: InterestFilter that caused this Interest to be routed
110 * @param interest: Interest that needs to be handled
111 */
112 virtual void
113 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
114
Alison Craig2a4d5282015-04-10 12:00:02 -0600115 /**
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600116 * Handles requests for responses to an filter initialization request
117 *
118 * @param filter: InterestFilter that caused this Interest to be routed
119 * @param interest: Interest that needs to be handled
120 */
121 virtual void
122 onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
123
124 /**
125 * Helper function that generates query results from a Json query carried in the Interest
126 *
127 * @param interest: Interest that needs to be handled
128 */
129 void
130 populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
131
132 void
133 getFiltersMenu(Json::Value& value);
134
135 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600136 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600137 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600138 * @param segmentPrefix: Name that identifies the Prefix for the Data
139 * @param value: Json::Value to be sent in the Data
140 * @param segmentNo: uint64_t the segment for this Data
141 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
142 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600143 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
Chengyu Fan92440162015-07-09 14:43:31 -0600144 * @param resultCount: the number of records in the query results
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 * @param viewStart: the start index of the record in the query results payload
146 * @param viewEnd: the end index of the record in the query results payload
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600147 * @param lastComponent: flag to indicate the content contains the last component for
148 autocompletion query
Alison Craig2a4d5282015-04-10 12:00:02 -0600149 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600150 std::shared_ptr<ndn::Data>
151 makeReplyData(const ndn::Name& segmentPrefix,
152 const Json::Value& value,
153 uint64_t segmentNo,
154 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -0600155 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600156 uint64_t resultCount,
157 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600158 uint64_t viewEnd,
159 bool lastComponent);
Alison Craig2a4d5282015-04-10 12:00:02 -0600160
161 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600162 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600163 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600164 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600165 */
166 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600167 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600168
Alison Craig1aced7d2015-04-10 12:00:02 -0600169 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600170 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600171 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600172 * @param interest: Intersts that needs to be handled
173 * @param version: Version that needs to be in the data name
174 */
175 std::shared_ptr<ndn::Data>
176 makeAckData(std::shared_ptr<const ndn::Interest> interest,
177 const ndn::Name::Component& version);
178
179 /**
Chengyu Fan92440162015-07-09 14:43:31 -0600180 * Helper function that sends NACK
181 *
182 * @param dataPrefix: prefix for the data packet
Alison Craig1aced7d2015-04-10 12:00:02 -0600183 */
184 void
Chengyu Fan92440162015-07-09 14:43:31 -0600185 sendNack(const ndn::Name& dataPrefix);
186
187 /**
188 * Helper function that generates the sqlQuery string for component-based query
189 * @param sqlQuery: stringstream to save the sqlQuery string
190 * @param jsonValue: Json value that contains the query information
191 */
192 bool
Chengyu Fanb25835b2015-04-28 17:09:35 -0600193 json2Sql(std::stringstream& sqlQuery,
Chengyu Fan92440162015-07-09 14:43:31 -0600194 Json::Value& jsonValue);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600195
196 /**
197 * Helper function that signs the data
198 */
199 void
200 signData(ndn::Data& data);
201
202 /**
203 * Helper function that publishes query-results data segments
204 */
205 virtual void
206 prepareSegments(const ndn::Name& segmentPrefix,
207 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600208 bool autocomplete,
209 bool lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600210
211 /**
212 * Helper function to set the DatabaseHandler
213 */
214 void
215 setDatabaseHandler(const util::ConnectionDetails& databaseId);
216
217 /**
218 * Helper function that set filters to make the adapter work
219 */
220 void
221 setFilters();
222
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600223 void
224 setCatalogId();
225
Chengyu Fan92440162015-07-09 14:43:31 -0600226 /**
227 * Helper function that generates the sqlQuery string for autocomplete query
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600228 * @param sqlQuery: stringstream to save the sqlQuery string
229 * @param jsonValue: Json value that contains the query information
230 * @param lastComponent: Flag to mark the last component query
Chengyu Fan92440162015-07-09 14:43:31 -0600231 */
232 bool
233 json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600234 Json::Value& jsonValue,
235 bool& lastComponent);
Chengyu Fan92440162015-07-09 14:43:31 -0600236
Chengyu Fan46398212015-08-11 11:23:13 -0600237 bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600238 json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
239 Json::Value& jsonValue);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600240
241 ndn::Name
242 getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
243 const ndn::Name::Component& version);
Chengyu Fan46398212015-08-11 11:23:13 -0600244
Chengyu Fanb25835b2015-04-28 17:09:35 -0600245protected:
246 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
247 // Handle to the Catalog's database
248 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600249
Alison Craig2a4d5282015-04-10 12:00:02 -0600250 // mutex to control critical sections
251 std::mutex m_mutex;
252 // @{ needs m_mutex protection
253 // The Queries we are currently writing to
254 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600255
256 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600257 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600258 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan92440162015-07-09 14:43:31 -0600259 //std::vector<std::string> m_atmosColumns;
260 ndn::Name m_catalogId; // should be replaced with the PK digest
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600261 std::vector<std::string> m_filterCategoryNames;
Alison Craig2a4d5282015-04-10 12:00:02 -0600262};
263
Alison Craig2a4d5282015-04-10 12:00:02 -0600264template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600265QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
266 const std::shared_ptr<ndn::KeyChain>& keyChain)
267 : util::CatalogAdapter(face, keyChain)
268 , m_cache(250000)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600269 , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
Alison Craig2a4d5282015-04-10 12:00:02 -0600270{
Alison Craig2a4d5282015-04-10 12:00:02 -0600271}
272
273template <typename DatabaseHandler>
274void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600275QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600276{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600277 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
278 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
279 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
280 this, _1, _2),
281 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
282 this, _1),
283 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
284 this, _1, _2));
285
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600286 ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
287 m_registeredPrefixList[queryResultsPrefix] =
288 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
289 .append("query-results").append(m_catalogId)),
Chengyu Fanb25835b2015-04-28 17:09:35 -0600290 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
291 this, _1, _2),
292 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
293 this, _1),
294 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
295 this, _1, _2));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600296
297 ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
298 m_registeredPrefixList[filtersInitializationPrefix] =
299 m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
300 bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
301 this, _1, _2),
302 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
303 this, _1),
304 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
305 this, _1, _2));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600306}
307
308template <typename DatabaseHandler>
309void
310QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600311 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600312 const std::vector<std::string>& nameFields,
313 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600314{
Chengyu Fan92440162015-07-09 14:43:31 -0600315 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600316 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600317 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
318 _1, _2, _3, prefix));
319}
320
321template <typename DatabaseHandler>
322void
323QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
324 bool isDryRun,
325 const std::string& filename,
326 const ndn::Name& prefix)
327{
328 using namespace util;
329 if (isDryRun) {
330 return;
331 }
332 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
333 for (auto item = section.begin();
334 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600335 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600336 {
337 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600338 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600339 if (signingId.empty()) {
340 throw Error("Empty value for \"signingId\""
341 " in \"query\" section");
342 }
343 }
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600344 if (item->first == "filterCategoryNames") {
345 std::istringstream ss(item->second.get_value<std::string>());
346 std::string token;
347 while(std::getline(ss, token, ',')) {
348 m_filterCategoryNames.push_back(token);
349 }
350 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600351 if (item->first == "database") {
352 const util::ConfigSection& dataSection = item->second;
353 for (auto subItem = dataSection.begin();
354 subItem != dataSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600355 ++subItem)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600356 {
357 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600358 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600359 if (dbServer.empty()){
360 throw Error("Invalid value for \"dbServer\""
361 " in \"query\" section");
362 }
363 }
364 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600365 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600366 if (dbName.empty()){
367 throw Error("Invalid value for \"dbName\""
368 " in \"query\" section");
369 }
370 }
371 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600372 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373 if (dbUser.empty()){
374 throw Error("Invalid value for \"dbUser\""
375 " in \"query\" section");
376 }
377 }
378 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600379 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600380 if (dbPasswd.empty()){
381 throw Error("Invalid value for \"dbPasswd\""
382 " in \"query\" section");
383 }
384 }
385 }
386 }
387 }
388
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600389 if (m_filterCategoryNames.size() == 0) {
390 throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
391 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600392
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600393 m_prefix = prefix;
394
395 m_signingId = ndn::Name(signingId);
396 setCatalogId();
397
398 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600399 setDatabaseHandler(mysqlId);
400 setFilters();
401}
402
403template <typename DatabaseHandler>
404void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600405QueryAdapter<DatabaseHandler>::setCatalogId()
406{
407 //empty
408}
409
410template <>
411void
412QueryAdapter<MYSQL>::setCatalogId()
413{
414 // use public key digest as the catalog ID
415 ndn::Name keyId;
416 if (m_signingId.empty()) {
417 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
418 } else {
419 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
420 }
421
422 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
423 ndn::Block keyDigest = pKey->computeDigest();
424 m_catalogId.clear();
425 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
426}
427
428template <typename DatabaseHandler>
429void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600430QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
431{
432 //empty
433}
434
435template <>
436void
437QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
438{
439 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
440
441 m_databaseHandler = conn;
442}
443
444template <typename DatabaseHandler>
445QueryAdapter<DatabaseHandler>::~QueryAdapter()
446{
447 for (const auto& itr : m_registeredPrefixList) {
448 if (static_cast<bool>(itr.second))
449 m_face->unsetInterestFilter(itr.second);
450 }
451}
452
453template <typename DatabaseHandler>
454void
455QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
456 const ndn::Interest& interest)
457{
458 // strictly enforce query initialization namespace.
459 // Name should be our local prefix + "query" + parameters
Alison Craig2a4d5282015-04-10 12:00:02 -0600460 if (interest.getName().size() != filter.getPrefix().size() + 1) {
461 // @todo: return a nack
462 return;
463 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600464 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fan92440162015-07-09 14:43:31 -0600465
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600466#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600467 std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600468#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600469
Chengyu Fanb25835b2015-04-28 17:09:35 -0600470 // @todo: use thread pool
471 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
472 this,
473 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600474 queryThread.join();
475}
476
477template <typename DatabaseHandler>
478void
479QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
480 const ndn::Interest& interest)
481{
482 // FIXME Results are currently getting served out of the forwarder's
483 // CS so we just ignore any retrieval Interests that hit us for
484 // now. In the future, this should check some form of
485 // InMemoryStorage.
Chengyu Fan92440162015-07-09 14:43:31 -0600486
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600487#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600488 std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600489#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600490
Alison Craig1aced7d2015-04-10 12:00:02 -0600491 auto data = m_cache.find(interest.getName());
492 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600493 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600494 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600495}
496
497template <typename DatabaseHandler>
498void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600499QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
500 const ndn::Interest& interest)
501{
502 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
503
504#ifndef NDEBUG
505 std::cout << "incoming initialization interest : " << interestPtr->getName() << std::endl;
506#endif
507 // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
508 // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
509
510 auto data = m_cache.find(interest.getName());
511 if (data) {
512 m_face->put(*data);
513 }
514 else {
515 std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
516 this,
517 interestPtr);
518 queryThread.join();
519 }
520}
521
522template <typename DatabaseHandler>
523void
524QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
525{
526 Json::Value filters;
527 Json::FastWriter fastWriter;
528 getFiltersMenu(filters);
529
530 const std::string filterValue = fastWriter.write(filters);
531
532 if (!filters.empty()) {
533 ndn::Name filterDataName(interest->getName());
534 filterDataName.append("stateVersion");// TODO: should replace with a state version
535
536 const char* payload = filterValue.c_str();
537 size_t payloadLength = filterValue.size();
538 size_t startIndex = 0, seqNo = 0;
539
540 if (filterValue.length() > PAYLOAD_LIMIT) {
541 payloadLength = PAYLOAD_LIMIT;
542 ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
543 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
544 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
545 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
546
547 signData(*filterData);
548#ifndef NDEBUG
549 std::cout << "populate filter Data : " << segmentName << std::endl;
550#endif
551 m_mutex.lock();
552 m_cache.insert(*filterData);
553 try {
554 m_face->put(*filterData);
555 }// catch exceptions and log
556 catch (std::exception& e) {
557 std::cout << e.what() << std::endl;
558 }
559 m_mutex.unlock();
560
561 seqNo++;
562 startIndex = payloadLength * seqNo + 1;
563 }
564 payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
565
566 ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
567 std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
568 filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
569 filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
570 filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
571
572 signData(*filterData);
573 m_mutex.lock();
574 m_cache.insert(*filterData);
575 m_face->put(*filterData);
576 m_mutex.unlock();
577 }
578}
579
580template <typename DatabaseHandler>
581void
582QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
583{
584 // empty
585}
586
587// get distinct value of each column
588template <>
589void
590QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
591{
592 Json::Value tmp;
593
594 for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
595 std::string columnName = m_filterCategoryNames[i];
596 std::string getFilterSql("SELECT DISTINCT " + columnName +
597 " FROM " + m_databaseTable + ";");
598 std::string errMsg;
599 bool success;
600
601 std::shared_ptr<MYSQL_RES> results
602 = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
603 util::QUERY, success, errMsg);
604 if (!success) {
605 std::cout << errMsg << std::endl;
606 value.clear();
607 return;
608 }
609
610 while (MYSQL_ROW row = mysql_fetch_row(results.get()))
611 {
612 tmp[columnName].append(row[0]);
613 }
614 value.append(tmp);
615 tmp.clear();
616 }
617
618#ifndef NDEBUG
619 std::cout << value.toStyledString() << std::endl;
620#endif
621}
622
623template <typename DatabaseHandler>
624void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600625QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600626{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600627 if (m_signingId.empty())
628 m_keyChain->sign(data);
629 else {
630 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
631 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
632 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600633 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600634}
635
636template <typename DatabaseHandler>
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600637ndn::Name
638QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
639 const ndn::Name::Component& version)
640{
641 // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
642 // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
643
644 ndn::Name queryResultName(m_prefix);
645 queryResultName.append("query-results")
646 .append(m_catalogId)
647 .append(interest->getName().get(-1))
648 .append(version);
649 return queryResultName;
650}
651
652template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600653std::shared_ptr<ndn::Data>
654QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
655 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600656{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600657 std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
Alison Craig2a4d5282015-04-10 12:00:02 -0600658
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600659 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
660 ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
661 queryResultNameStr.length());
Chengyu Fan92440162015-07-09 14:43:31 -0600662 ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
663
Chengyu Fanb25835b2015-04-28 17:09:35 -0600664 signData(*ack);
Chengyu Fan92440162015-07-09 14:43:31 -0600665#ifndef NDEBUG
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600666 std::cout << "qurey-results data name in ACK : " << queryResultNameStr << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -0600667#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -0600668 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600669}
670
671template <typename DatabaseHandler>
672void
Chengyu Fan92440162015-07-09 14:43:31 -0600673QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600674{
Chengyu Fan92440162015-07-09 14:43:31 -0600675 uint64_t segmentNo = 0;
676
677 std::shared_ptr<ndn::Data> nack =
678 std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
679 nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
680 nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
681
682 signData(*nack);
Chengyu Fan46398212015-08-11 11:23:13 -0600683#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -0600684 std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600685#endif
Chengyu Fan92440162015-07-09 14:43:31 -0600686 m_mutex.lock();
687 m_cache.insert(*nack);
688 m_mutex.unlock();
689}
690
691
692template <typename DatabaseHandler>
693bool
694QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
695 Json::Value& jsonValue)
696{
697#ifndef NDEBUG
698 std::cout << "jsonValue in json2Sql: " << jsonValue.toStyledString() << std::endl;
699#endif
700 if (jsonValue.type() != Json::objectValue) {
701 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
702 return false;
703 }
704
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600705 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600706 bool input = false;
707 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
708 {
709 Json::Value key = iter.key();
710 Json::Value value = (*iter);
711
Chengyu Fan92440162015-07-09 14:43:31 -0600712 if (key == Json::nullValue || value == Json::nullValue) {
713 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
714 return false;
715 }
716
717 // cannot convert to string
718 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
719 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
720 return false;
721 }
722
723 if (key.asString().compare("?") == 0) {
724 continue;
725 }
726
Chengyu Fanb25835b2015-04-28 17:09:35 -0600727 if (input) {
728 sqlQuery << " AND";
729 } else {
730 sqlQuery << " WHERE";
731 }
732
Chengyu Fan92440162015-07-09 14:43:31 -0600733 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
Chengyu Fanb25835b2015-04-28 17:09:35 -0600734 input = true;
735 }
736
737 if (!input) { // Force it to be the empty set
Chengyu Fan92440162015-07-09 14:43:31 -0600738 return false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600739 }
740 sqlQuery << ";";
Chengyu Fan92440162015-07-09 14:43:31 -0600741 return true;
742}
743
744template <typename DatabaseHandler>
745bool
746QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600747 Json::Value& jsonValue,
748 bool& lastComponent)
Chengyu Fan92440162015-07-09 14:43:31 -0600749{
750#ifndef NDEBUG
751 std::cout << "jsonValue in json2AutocompletionSql: " << jsonValue.toStyledString() << std::endl;
752#endif
753 if (jsonValue.type() != Json::objectValue) {
754 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
755 return false;
756 }
757
758 std::string typedString;
759 // get the string in the jsonValue
760 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
761 {
762 Json::Value key = iter.key();
763 Json::Value value = (*iter);
764
765 if (key == Json::nullValue || value == Json::nullValue) {
766 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
767 return false;
768 }
769
770 // cannot convert to string
771 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
772 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
773 return false;
774 }
775
776 if (key.asString().compare("?") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600777 typedString = value.asString();
Chengyu Fan92440162015-07-09 14:43:31 -0600778 // since the front end triggers the autocompletion when users typed '/',
779 // there must be a '/' at the end, and the first char must be '/'
780 if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
781 return false;
782 break;
783 }
784 }
785
786 // 1. get the expected column number by parsing the typedString, so we can get the filed name
787 size_t pos = 0;
788 size_t start = 1; // start from the 1st char which is not '/'
789 size_t count = 0; // also the name to query for
790 std::string token;
791 std::string delimiter = "/";
792 std::map<std::string, std::string> typedComponents;
793 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
794 token = typedString.substr(start, pos - start);
795 if (count >= m_nameFields.size() - 1) {
796 return false;
797 }
798
799 // add column name and value (token) into map
800 typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600801 count++;
Chengyu Fan92440162015-07-09 14:43:31 -0600802 start = pos + 1;
803 }
804
805 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
806 // return true
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600807 if (count == m_nameFields.size() - 1)
808 lastComponent = true; // indicate this query is to query the last component
809
Chengyu Fan92440162015-07-09 14:43:31 -0600810 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600811 sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
Chengyu Fan46398212015-08-11 11:23:13 -0600812 for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
813 it != typedComponents.end(); ++it) {
814 if (more)
815 sqlQuery << " AND";
816 else
817 sqlQuery << " WHERE";
818
819 sqlQuery << " " << it->first << "='" << it->second << "'";
820
821 more = true;
822 }
823 sqlQuery << ";";
824 return true;
825}
826
827template <typename DatabaseHandler>
828bool
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600829QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
830 Json::Value& jsonValue)
Chengyu Fan46398212015-08-11 11:23:13 -0600831{
832#ifndef NDEBUG
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600833 std::cout << "jsonValue in json2PrefixBasedSearchSql: " << jsonValue.toStyledString() << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600834#endif
835 if (jsonValue.type() != Json::objectValue) {
836 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
837 return false;
838 }
839
840 std::string typedString;
841 // get the string in the jsonValue
842 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
843 {
844 Json::Value key = iter.key();
845 Json::Value value = (*iter);
846
847 if (key == Json::nullValue || value == Json::nullValue) {
848 std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
849 return false;
850 }
851
852 // cannot convert to string
853 if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
854 std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
855 return false;
856 }
857
858 if (key.asString().compare("??") == 0) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600859 typedString = value.asString();
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600860 if (typedString.empty() || typedString.find("/") != 0)
Chengyu Fan46398212015-08-11 11:23:13 -0600861 return false;
862 break;
863 }
864 }
865
866 // 1. get the expected column number by parsing the typedString, so we can get the filed name
867 size_t pos = 0;
868 size_t start = 1; // start from the 1st char which is not '/'
869 size_t count = 0; // also the name to query for
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600870 size_t typedStringLen = typedString.length();
Chengyu Fan46398212015-08-11 11:23:13 -0600871 std::string token;
872 std::string delimiter = "/";
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600873 std::vector<std::pair<std::string, std::string>> typedComponents;
Chengyu Fan46398212015-08-11 11:23:13 -0600874 while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
875 token = typedString.substr(start, pos - start);
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600876 if (count >= m_nameFields.size()) {
Chengyu Fan46398212015-08-11 11:23:13 -0600877 return false;
878 }
879
880 // add column name and value (token) into map
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600881 typedComponents.push_back(std::make_pair(m_nameFields[count], token));
882
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600883 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600884 start = pos + 1;
885 }
886
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600887 // we may have a component after the last "/"
888 if (start < typedStringLen) {
889 typedComponents.push_back(std::make_pair(m_nameFields[count],
890 typedString.substr(start, typedStringLen - start)));
891 }
892
Chengyu Fan46398212015-08-11 11:23:13 -0600893 // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
894 // return true
895 bool more = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600896 sqlQuery << "SELECT name FROM " << m_databaseTable;
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600897 for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
Chengyu Fan92440162015-07-09 14:43:31 -0600898 it != typedComponents.end(); ++it) {
899 if (more)
900 sqlQuery << " AND";
901 else
902 sqlQuery << " WHERE";
903
904 sqlQuery << " " << it->first << "='" << it->second << "'";
905
906 more = true;
907 }
908 sqlQuery << ";";
909 return true;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600910}
911
912template <typename DatabaseHandler>
913void
914QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600915{
Alison Craig1aced7d2015-04-10 12:00:02 -0600916 // 1) Strip the prefix off the ndn::Interest's ndn::Name
917 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600918
Chengyu Fanb25835b2015-04-28 17:09:35 -0600919 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
920 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
921 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600922
Chengyu Fanb25835b2015-04-28 17:09:35 -0600923 if (jsonQuery.length() <= 0) {
Chengyu Fan92440162015-07-09 14:43:31 -0600924 // no JSON query, send Nack?
Chengyu Fanb25835b2015-04-28 17:09:35 -0600925 return;
926 }
Chengyu Fan92440162015-07-09 14:43:31 -0600927 // check if the ACK is cached, if yes, respond with ACK
928 // ?? what if the results for now it NULL, but latter exist?
Alison Craig2a4d5282015-04-10 12:00:02 -0600929 // For efficiency, do a double check. Once without the lock, then with it.
930 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
931 m_mutex.lock();
932 { // !!! BEGIN CRITICAL SECTION !!!
933 // If this fails upon locking, we removed it during our search.
934 // An unusual race-condition case, which requires things like PIT aggregation to be off.
935 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
936 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600937 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600938 m_mutex.unlock(); //escape lock
939 return;
940 }
941 } // !!! END CRITICAL SECTION !!!
942 m_mutex.unlock();
943 }
944
945 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
946 Json::Value parsedFromString;
947 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600948 if (!reader.parse(jsonQuery, parsedFromString)) {
949 // @todo: send NACK?
950 std::cout << "cannot parse the JsonQuery" << std::endl;
951 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600952 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600953
Chengyu Fan92440162015-07-09 14:43:31 -0600954 // the version should be replaced with ChronoSync state digest
Chengyu Fanb25835b2015-04-28 17:09:35 -0600955 const ndn::name::Component version
956 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
957 ndn::time::system_clock::now()).count());
958
959 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
960
961 m_mutex.lock();
962 { // !!! BEGIN CRITICAL SECTION !!!
963 // An unusual race-condition case, which requires things like PIT aggregation to be off.
964 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
965 if (iter != m_activeQueryToFirstResponse.end()) {
966 m_face->put(*(iter->second));
967 m_mutex.unlock(); // escape lock
968 return;
969 }
970 // This is where things are expensive so we save them for the lock
Chengyu Fan92440162015-07-09 14:43:31 -0600971 // note that we ack the query with the cached ACK messages, but we should remove the ACKs
972 // that conatin the old version when ChronoSync is updated
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600973 //m_activeQueryToFirstResponse.insert(std::pair<std::string,
974 // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600975 m_face->put(*ack);
976 } // !!! END CRITICAL SECTION !!!
977 m_mutex.unlock();
978
979 // 3) Convert the JSON Query into a MySQL one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600980 bool autocomplete = false, lastComponent = false;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600981 std::stringstream sqlQuery;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600982
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600983 ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Chengyu Fanb25835b2015-04-28 17:09:35 -0600984
Chengyu Fan92440162015-07-09 14:43:31 -0600985 Json::Value tmp;
986 // expect the autocomplete and the component-based query are separate
987 // if JSON::Value contains ? as key, is autocompletion
988 if (parsedFromString.get("?", tmp) != tmp) {
989 autocomplete = true;
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600990 if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
Chengyu Fan92440162015-07-09 14:43:31 -0600991 sendNack(segmentPrefix);
992 return;
993 }
994 }
Chengyu Fan46398212015-08-11 11:23:13 -0600995 else if (parsedFromString.get("??", tmp) != tmp) {
Chengyu Fan4d5fbd22015-09-18 14:34:08 -0600996 if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600997 sendNack(segmentPrefix);
998 return;
999 }
1000 }
Chengyu Fan92440162015-07-09 14:43:31 -06001001 else {
1002 if (!json2Sql(sqlQuery, parsedFromString)) {
1003 sendNack(segmentPrefix);
1004 return;
1005 }
1006 }
1007
1008 // 4) Run the Query
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001009 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001010}
1011
1012template <typename DatabaseHandler>
1013void
1014QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
1015 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001016 bool autocomplete,
1017 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001018{
1019 // empty
1020}
1021
1022// prepareSegments specilization function
1023template<>
1024void
1025QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
1026 const std::string& sqlString,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001027 bool autocomplete,
1028 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001029{
Chengyu Fan46398212015-08-11 11:23:13 -06001030#ifndef NDEBUG
Chengyu Fan92440162015-07-09 14:43:31 -06001031 std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -06001032#endif
1033 std::string errMsg;
1034 bool success;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001035 // 4) Run the Query
1036 std::shared_ptr<MYSQL_RES> results
Chengyu Fan46398212015-08-11 11:23:13 -06001037 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
1038 if (!success)
1039 std::cout << errMsg << std::endl;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001040
1041 if (!results) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001042 std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
Chengyu Fan92440162015-07-09 14:43:31 -06001043
Chengyu Fanb25835b2015-04-28 17:09:35 -06001044 // @todo: throw runtime error or log the error message?
1045 return;
1046 }
1047
Chengyu Fan92440162015-07-09 14:43:31 -06001048 uint64_t resultCount = mysql_num_rows(results.get());
1049
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001050#ifndef NDEBUG
Chengyu Fanb25835b2015-04-28 17:09:35 -06001051 std::cout << "Query results for \""
1052 << sqlString
1053 << "\" contain "
Chengyu Fan92440162015-07-09 14:43:31 -06001054 << resultCount
Chengyu Fanb25835b2015-04-28 17:09:35 -06001055 << " rows" << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001056#endif
Chengyu Fanb25835b2015-04-28 17:09:35 -06001057
1058 MYSQL_ROW row;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001059 uint64_t segmentNo = 0;
Chengyu Fan46398212015-08-11 11:23:13 -06001060 Json::Value tmp;
1061 Json::Value resultJson;
1062 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001063
1064 uint64_t viewStart = 0, viewEnd = 0;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001065 while ((row = mysql_fetch_row(results.get())))
1066 {
Chengyu Fan46398212015-08-11 11:23:13 -06001067 tmp.append(row[0]);
1068 const std::string tmpString = fastWriter.write(tmp);
1069 if (tmpString.length() > PAYLOAD_LIMIT) {
Chengyu Fanb25835b2015-04-28 17:09:35 -06001070 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001071 = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001072 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001073 m_mutex.lock();
1074 m_cache.insert(*data);
1075 m_mutex.unlock();
Chengyu Fan46398212015-08-11 11:23:13 -06001076 tmp.clear();
1077 resultJson.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001078 segmentNo++;
1079 viewStart = viewEnd + 1;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001080 }
Chengyu Fan46398212015-08-11 11:23:13 -06001081 resultJson.append(row[0]);
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001082 viewEnd++;
Chengyu Fanb25835b2015-04-28 17:09:35 -06001083 }
Chengyu Fan46398212015-08-11 11:23:13 -06001084
Chengyu Fanb25835b2015-04-28 17:09:35 -06001085 std::shared_ptr<ndn::Data> data
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001086 = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001087 autocomplete, resultCount, viewStart, viewEnd, lastComponent);
Chengyu Fanb25835b2015-04-28 17:09:35 -06001088 m_mutex.lock();
1089 m_cache.insert(*data);
1090 m_mutex.unlock();
1091}
1092
1093template <typename DatabaseHandler>
1094std::shared_ptr<ndn::Data>
1095QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
1096 const Json::Value& value,
1097 uint64_t segmentNo,
1098 bool isFinalBlock,
Chengyu Fan92440162015-07-09 14:43:31 -06001099 bool isAutocomplete,
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001100 uint64_t resultCount,
1101 uint64_t viewStart,
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001102 uint64_t viewEnd,
1103 bool lastComponent)
Chengyu Fanb25835b2015-04-28 17:09:35 -06001104{
1105 Json::Value entry;
1106 Json::FastWriter fastWriter;
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001107
1108 entry["resultCount"] = Json::UInt64(resultCount);;
1109 entry["viewStart"] = Json::UInt64(viewStart);
1110 entry["viewEnd"] = Json::UInt64(viewEnd);
1111
Chengyu Fan3b9bb342015-09-21 10:53:37 -06001112 if (lastComponent)
1113 entry["lastComponent"] = Json::Value(true);
1114
Chengyu Fanf4c747a2015-08-18 13:56:01 -06001115#ifndef NDEBUG
1116 std::cout << "resultCount " << resultCount
1117 << "; viewStart " << viewStart
1118 << "; viewEnd " << viewEnd << std::endl;
1119#endif
1120
Chengyu Fanb25835b2015-04-28 17:09:35 -06001121 if (isAutocomplete) {
1122 entry["next"] = value;
1123 } else {
1124 entry["results"] = value;
1125 }
1126 const std::string jsonMessage = fastWriter.write(entry);
1127 const char* payload = jsonMessage.c_str();
1128 size_t payloadLength = jsonMessage.size() + 1;
1129 ndn::Name segmentName(segmentPrefix);
1130 segmentName.appendSegment(segmentNo);
1131
1132 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
1133 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
1134 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
1135
1136 if (isFinalBlock) {
1137 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
1138 }
1139#ifndef NDEBUG
1140 std::cout << "makeReplyData : " << segmentName << std::endl;
1141#endif
1142 signData(*data);
1143 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -06001144}
1145
1146} // namespace query
1147} // namespace atmos
1148#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP