blob: 107e6c45cf206592419cc61c6c97b8262eca3899 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
20#define ATMOS_QUERY_QUERY_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanb25835b2015-04-28 17:09:35 -060024#include "util/config-file.hpp"
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <thread>
27
Alison Craig2a4d5282015-04-10 12:00:02 -060028#include <json/reader.h>
29#include <json/value.h>
30#include <json/writer.h>
31
32#include <ndn-cxx/data.hpp>
33#include <ndn-cxx/face.hpp>
34#include <ndn-cxx/interest.hpp>
35#include <ndn-cxx/interest-filter.hpp>
36#include <ndn-cxx/name.hpp>
37#include <ndn-cxx/security/key-chain.hpp>
38#include <ndn-cxx/util/time.hpp>
39#include <ndn-cxx/encoding/encoding-buffer.hpp>
Alison Craig1aced7d2015-04-10 12:00:02 -060040#include <ndn-cxx/util/in-memory-storage-lru.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060041
42#include "mysql/mysql.h"
43
Alison Craig2a4d5282015-04-10 12:00:02 -060044#include <map>
Chengyu Fanb25835b2015-04-28 17:09:35 -060045#include <unordered_map>
Alison Craig2a4d5282015-04-10 12:00:02 -060046#include <memory>
47#include <mutex>
48#include <sstream>
49#include <string>
50
51namespace atmos {
52namespace query {
Alison Craig2a4d5282015-04-10 12:00:02 -060053static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
54
55/**
56 * QueryAdapter handles the Query usecases for the catalog
57 */
58template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060059class QueryAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060060public:
61 /**
62 * Constructor
63 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060064 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060066 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060067 QueryAdapter(const std::shared_ptr<ndn::Face>& face,
68 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060069
Alison Craig2a4d5282015-04-10 12:00:02 -060070 virtual
71 ~QueryAdapter();
72
73 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060074 * Helper function to specify section handler
75 */
76 void
77 setConfigFile(util::ConfigFile& config,
78 const ndn::Name& prefix);
79
80protected:
81 /**
82 * Helper function for configuration parsing
83 */
84 void
85 onConfig(const util::ConfigSection& section,
86 bool isDryDun,
87 const std::string& fileName,
88 const ndn::Name& prefix);
89
90 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060091 * Handles incoming query requests by stripping the filter off the Interest to get the
92 * actual request out. This removes the need for a 2-step Interest-Data retrieval.
93 *
94 * @param filter: InterestFilter that caused this Interest to be routed
95 * @param interest: Interest that needs to be handled
96 */
97 virtual void
98 onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
99
100 /**
101 * Handles requests for responses to an existing query
102 *
103 * @param filter: InterestFilter that caused this Interest to be routed
104 * @param interest: Interest that needs to be handled
105 */
106 virtual void
107 onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
108
Alison Craig2a4d5282015-04-10 12:00:02 -0600109 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600110 * Helper function that makes query-results data
Alison Craig2a4d5282015-04-10 12:00:02 -0600111 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600112 * @param segmentPrefix: Name that identifies the Prefix for the Data
113 * @param value: Json::Value to be sent in the Data
114 * @param segmentNo: uint64_t the segment for this Data
115 * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
116 * last entry
Alison Craig2a4d5282015-04-10 12:00:02 -0600117 * @param isAutocomplete: bool to indicate whether this is an autocomplete message
118 */
Chengyu Fanb25835b2015-04-28 17:09:35 -0600119 std::shared_ptr<ndn::Data>
120 makeReplyData(const ndn::Name& segmentPrefix,
121 const Json::Value& value,
122 uint64_t segmentNo,
123 bool isFinalBlock,
124 bool isAutocomplete);
Alison Craig2a4d5282015-04-10 12:00:02 -0600125
126 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600127 * Helper function that generates query results from a Json query carried in the Interest
Alison Craig2a4d5282015-04-10 12:00:02 -0600128 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600129 * @param interest: Interest that needs to be handled
Alison Craig2a4d5282015-04-10 12:00:02 -0600130 */
131 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600132 runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600133
Alison Craig1aced7d2015-04-10 12:00:02 -0600134 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -0600135 * Helper function that makes ACK data
Alison Craig1aced7d2015-04-10 12:00:02 -0600136 *
Chengyu Fanb25835b2015-04-28 17:09:35 -0600137 * @param interest: Intersts that needs to be handled
138 * @param version: Version that needs to be in the data name
139 */
140 std::shared_ptr<ndn::Data>
141 makeAckData(std::shared_ptr<const ndn::Interest> interest,
142 const ndn::Name::Component& version);
143
144 /**
145 * Helper function that generates the sqlQuery string and autocomplete flag
146 * @param sqlQuery: stringstream to save the sqlQuery string
147 * @param jsonValue: Json value that contains the query information
148 * @param autocomplete: Flag to indicate if the json contains autocomplete flag
Alison Craig1aced7d2015-04-10 12:00:02 -0600149 */
150 void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600151 json2Sql(std::stringstream& sqlQuery,
152 Json::Value& jsonValue,
153 bool& autocomplete);
154
155 /**
156 * Helper function that signs the data
157 */
158 void
159 signData(ndn::Data& data);
160
161 /**
162 * Helper function that publishes query-results data segments
163 */
164 virtual void
165 prepareSegments(const ndn::Name& segmentPrefix,
166 const std::string& sqlString,
167 bool autocomplete);
168
169 /**
170 * Helper function to set the DatabaseHandler
171 */
172 void
173 setDatabaseHandler(const util::ConnectionDetails& databaseId);
174
175 /**
176 * Helper function that set filters to make the adapter work
177 */
178 void
179 setFilters();
180
181protected:
182 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
183 // Handle to the Catalog's database
184 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Alison Craig1aced7d2015-04-10 12:00:02 -0600185
Alison Craig2a4d5282015-04-10 12:00:02 -0600186 // mutex to control critical sections
187 std::mutex m_mutex;
188 // @{ needs m_mutex protection
189 // The Queries we are currently writing to
190 std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
Alison Craig1aced7d2015-04-10 12:00:02 -0600191
192 ndn::util::InMemoryStorageLru m_cache;
Alison Craig2a4d5282015-04-10 12:00:02 -0600193 // @}
Chengyu Fanb25835b2015-04-28 17:09:35 -0600194 RegisteredPrefixList m_registeredPrefixList;
Alison Craig2a4d5282015-04-10 12:00:02 -0600195};
196
Alison Craig2a4d5282015-04-10 12:00:02 -0600197template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600198QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
199 const std::shared_ptr<ndn::KeyChain>& keyChain)
200 : util::CatalogAdapter(face, keyChain)
201 , m_cache(250000)
Alison Craig2a4d5282015-04-10 12:00:02 -0600202{
Alison Craig2a4d5282015-04-10 12:00:02 -0600203}
204
205template <typename DatabaseHandler>
206void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600207QueryAdapter<DatabaseHandler>::setFilters()
Alison Craig2a4d5282015-04-10 12:00:02 -0600208{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600209 ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
210 m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
211 bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
212 this, _1, _2),
213 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
214 this, _1),
215 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
216 this, _1, _2));
217
218 ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
219 m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
220 bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
221 this, _1, _2),
222 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
223 this, _1),
224 bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
225 this, _1, _2));
226}
227
228template <typename DatabaseHandler>
229void
230QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
231 const ndn::Name& prefix)
232{
233 config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
234 _1, _2, _3, prefix));
235}
236
237template <typename DatabaseHandler>
238void
239QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
240 bool isDryRun,
241 const std::string& filename,
242 const ndn::Name& prefix)
243{
244 using namespace util;
245 if (isDryRun) {
246 return;
247 }
248 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
249 for (auto item = section.begin();
250 item != section.end();
251 ++ item)
252 {
253 if (item->first == "signingId") {
254 signingId.assign(item->second.get_value<std::string>());
255 if (signingId.empty()) {
256 throw Error("Empty value for \"signingId\""
257 " in \"query\" section");
258 }
259 }
260 if (item->first == "database") {
261 const util::ConfigSection& dataSection = item->second;
262 for (auto subItem = dataSection.begin();
263 subItem != dataSection.end();
264 ++ subItem)
265 {
266 if (subItem->first == "dbServer") {
267 dbServer.assign(subItem->second.get_value<std::string>());
268 if (dbServer.empty()){
269 throw Error("Invalid value for \"dbServer\""
270 " in \"query\" section");
271 }
272 }
273 if (subItem->first == "dbName") {
274 dbName.assign(subItem->second.get_value<std::string>());
275 if (dbName.empty()){
276 throw Error("Invalid value for \"dbName\""
277 " in \"query\" section");
278 }
279 }
280 if (subItem->first == "dbUser") {
281 dbUser.assign(subItem->second.get_value<std::string>());
282 if (dbUser.empty()){
283 throw Error("Invalid value for \"dbUser\""
284 " in \"query\" section");
285 }
286 }
287 if (subItem->first == "dbPasswd") {
288 dbPasswd.assign(subItem->second.get_value<std::string>());
289 if (dbPasswd.empty()){
290 throw Error("Invalid value for \"dbPasswd\""
291 " in \"query\" section");
292 }
293 }
294 }
295 }
296 }
297
298 m_prefix = prefix;
299 m_signingId = ndn::Name(signingId);
300 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
301
302 setDatabaseHandler(mysqlId);
303 setFilters();
304}
305
306template <typename DatabaseHandler>
307void
308QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
309{
310 //empty
311}
312
313template <>
314void
315QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
316{
317 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
318
319 m_databaseHandler = conn;
320}
321
322template <typename DatabaseHandler>
323QueryAdapter<DatabaseHandler>::~QueryAdapter()
324{
325 for (const auto& itr : m_registeredPrefixList) {
326 if (static_cast<bool>(itr.second))
327 m_face->unsetInterestFilter(itr.second);
328 }
329}
330
331template <typename DatabaseHandler>
332void
333QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
334 const ndn::Interest& interest)
335{
336 // strictly enforce query initialization namespace.
337 // Name should be our local prefix + "query" + parameters
Alison Craig2a4d5282015-04-10 12:00:02 -0600338 if (interest.getName().size() != filter.getPrefix().size() + 1) {
339 // @todo: return a nack
340 return;
341 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600342 std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600343 #ifndef NDEBUG
344 std::cout << "query interest : " << interestPtr->getName() << std::endl;
345 #endif
346 // @todo: use thread pool
347 std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
348 this,
349 interestPtr);
Alison Craig2a4d5282015-04-10 12:00:02 -0600350 queryThread.join();
351}
352
353template <typename DatabaseHandler>
354void
355QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
356 const ndn::Interest& interest)
357{
358 // FIXME Results are currently getting served out of the forwarder's
359 // CS so we just ignore any retrieval Interests that hit us for
360 // now. In the future, this should check some form of
361 // InMemoryStorage.
Chengyu Fanb25835b2015-04-28 17:09:35 -0600362 #ifndef NDEBUG
363 std::cout << "query results interest : " << interest.toUri() << std::endl;
364 #endif
Alison Craig1aced7d2015-04-10 12:00:02 -0600365 auto data = m_cache.find(interest.getName());
366 if (data) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600367 m_face->put(*data);
Alison Craig1aced7d2015-04-10 12:00:02 -0600368 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600369}
370
371template <typename DatabaseHandler>
372void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600374{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600375 if (m_signingId.empty())
376 m_keyChain->sign(data);
377 else {
378 ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
379 ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
380 m_keyChain->sign(data, certName);
Alison Craig2a4d5282015-04-10 12:00:02 -0600381 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600382}
383
384template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -0600385std::shared_ptr<ndn::Data>
386QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
387 const ndn::Name::Component& version)
Alison Craig2a4d5282015-04-10 12:00:02 -0600388{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600389 // JSON parsed ok, so we can acknowledge successful receipt of the query
390 ndn::Name ackName(interest->getName());
391 ackName.append(version);
392 ackName.append("OK");
Alison Craig2a4d5282015-04-10 12:00:02 -0600393
Chengyu Fanb25835b2015-04-28 17:09:35 -0600394 std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
395 signData(*ack);
396 #ifndef NDEBUG
397 std::cout << "makeAckData : " << ackName << std::endl;
398 #endif
399 return ack;
Alison Craig2a4d5282015-04-10 12:00:02 -0600400}
401
402template <typename DatabaseHandler>
403void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600404QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
405 Json::Value& jsonValue,
406 bool& autocomplete)
407{
408 // 3) Convert the JSON Query into a MySQL one
409 sqlQuery << "SELECT name FROM cmip5";
410 bool input = false;
411 for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
412 {
413 Json::Value key = iter.key();
414 Json::Value value = (*iter);
415
416 if (input) {
417 sqlQuery << " AND";
418 } else {
419 sqlQuery << " WHERE";
420 }
421
422 // Auto-complete case
423 if (key.asString().compare("?") == 0) {
424 sqlQuery << " name REGEXP '^" << value.asString() << "'";
425 autocomplete = true;
426 }
427 // Component case
428 else {
429 sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
430 }
431 input = true;
432 }
433
434 if (!input) { // Force it to be the empty set
435 sqlQuery << " limit 0";
436 }
437 sqlQuery << ";";
438}
439
440template <typename DatabaseHandler>
441void
442QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600443{
Alison Craig1aced7d2015-04-10 12:00:02 -0600444 // 1) Strip the prefix off the ndn::Interest's ndn::Name
445 // +1 to grab JSON component after "query" component
Alison Craig1aced7d2015-04-10 12:00:02 -0600446
Chengyu Fanb25835b2015-04-28 17:09:35 -0600447 ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
448 // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
449 const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
Alison Craig2a4d5282015-04-10 12:00:02 -0600450
Chengyu Fanb25835b2015-04-28 17:09:35 -0600451 if (jsonQuery.length() <= 0) {
452 // send Nack?
453 return;
454 }
455 // ------------------
Alison Craig2a4d5282015-04-10 12:00:02 -0600456 // For efficiency, do a double check. Once without the lock, then with it.
457 if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
458 m_mutex.lock();
459 { // !!! BEGIN CRITICAL SECTION !!!
460 // If this fails upon locking, we removed it during our search.
461 // An unusual race-condition case, which requires things like PIT aggregation to be off.
462 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
463 if (iter != m_activeQueryToFirstResponse.end()) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600464 m_face->put(*(iter->second));
Alison Craig2a4d5282015-04-10 12:00:02 -0600465 m_mutex.unlock(); //escape lock
466 return;
467 }
468 } // !!! END CRITICAL SECTION !!!
469 m_mutex.unlock();
470 }
471
472 // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
473 Json::Value parsedFromString;
474 Json::Reader reader;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600475 if (!reader.parse(jsonQuery, parsedFromString)) {
476 // @todo: send NACK?
477 std::cout << "cannot parse the JsonQuery" << std::endl;
478 return;
Alison Craig2a4d5282015-04-10 12:00:02 -0600479 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600480
481 const ndn::name::Component version
482 = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
483 ndn::time::system_clock::now()).count());
484
485 std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
486
487 m_mutex.lock();
488 { // !!! BEGIN CRITICAL SECTION !!!
489 // An unusual race-condition case, which requires things like PIT aggregation to be off.
490 auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
491 if (iter != m_activeQueryToFirstResponse.end()) {
492 m_face->put(*(iter->second));
493 m_mutex.unlock(); // escape lock
494 return;
495 }
496 // This is where things are expensive so we save them for the lock
497 m_activeQueryToFirstResponse.insert(std::pair<std::string,
498 std::shared_ptr<ndn::Data>>(jsonQuery, ack));
499 m_face->put(*ack);
500 } // !!! END CRITICAL SECTION !!!
501 m_mutex.unlock();
502
503 // 3) Convert the JSON Query into a MySQL one
504 bool autocomplete = false;
505 std::stringstream sqlQuery;
506 json2Sql(sqlQuery, parsedFromString, autocomplete);
507
508 // 4) Run the Query
509 ndn::Name segmentPrefix(m_prefix);
510 segmentPrefix.append("query-results");
511 segmentPrefix.append(version);
512
513 prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
514}
515
516template <typename DatabaseHandler>
517void
518QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
519 const std::string& sqlString,
520 bool autocomplete)
521{
522 // empty
523}
524
525// prepareSegments specilization function
526template<>
527void
528QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
529 const std::string& sqlString,
530 bool autocomplete)
531{
532#ifndef NDEBUG
533 std::cout << "sqlString in prepareSegments : " << sqlString << std::endl;
534#endif
535 // 4) Run the Query
536 std::shared_ptr<MYSQL_RES> results
537 = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
538
539 if (!results) {
540#ifndef NDEBUG
541 std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
542#endif
543 // @todo: throw runtime error or log the error message?
544 return;
545 }
546
547#ifndef NDEBUG
548 std::cout << "Query results for \""
549 << sqlString
550 << "\" contain "
551 << mysql_num_rows(results.get())
552 << " rows" << std::endl;
553#endif
554
555 MYSQL_ROW row;
556 size_t usedBytes = 0;
557 const size_t PAYLOAD_LIMIT = 7000;
558 uint64_t segmentNo = 0;
559 Json::Value array;
560 while ((row = mysql_fetch_row(results.get())))
561 {
562 size_t size = strlen(row[0]) + 1;
563 if (usedBytes + size > PAYLOAD_LIMIT) {
564 std::shared_ptr<ndn::Data> data
565 = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete);
566 m_mutex.lock();
567 m_cache.insert(*data);
568 m_mutex.unlock();
569 array.clear();
570 usedBytes = 0;
571 segmentNo++;
572 }
573 array.append(row[0]);
574 usedBytes += size;
575 }
576 std::shared_ptr<ndn::Data> data
577 = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete);
578 m_mutex.lock();
579 m_cache.insert(*data);
580 m_mutex.unlock();
581}
582
583template <typename DatabaseHandler>
584std::shared_ptr<ndn::Data>
585QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
586 const Json::Value& value,
587 uint64_t segmentNo,
588 bool isFinalBlock,
589 bool isAutocomplete)
590{
591 Json::Value entry;
592 Json::FastWriter fastWriter;
593 if (isAutocomplete) {
594 entry["next"] = value;
595 } else {
596 entry["results"] = value;
597 }
598 const std::string jsonMessage = fastWriter.write(entry);
599 const char* payload = jsonMessage.c_str();
600 size_t payloadLength = jsonMessage.size() + 1;
601 ndn::Name segmentName(segmentPrefix);
602 segmentName.appendSegment(segmentNo);
603
604 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
605 data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
606 data->setFreshnessPeriod(ndn::time::milliseconds(10000));
607
608 if (isFinalBlock) {
609 data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
610 }
611#ifndef NDEBUG
612 std::cout << "makeReplyData : " << segmentName << std::endl;
613#endif
614 signData(*data);
615 return data;
Alison Craig2a4d5282015-04-10 12:00:02 -0600616}
617
618} // namespace query
619} // namespace atmos
620#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP