| /** NDN-Atmos: Cataloging Service for distributed data originally developed |
| * for atmospheric science data |
| * Copyright (C) 2015 Colorado State University |
| * |
| * NDN-Atmos is free software: you can redistribute it and/or modify |
| * it under the terms of the GNU General Public License as published by |
| * the Free Software Foundation, either version 3 of the License, or |
| * (at your option) any later version. |
| * |
| * NDN-Atmos is distributed in the hope that it will be useful, |
| * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| * GNU General Public License for more details. |
| * |
| * You should have received a copy of the GNU General Public License |
| * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>. |
| **/ |
| |
| #ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP |
| #define ATMOS_QUERY_QUERY_ADAPTER_HPP |
| |
| #include "util/catalog-adapter.hpp" |
| #include "util/mysql-util.hpp" |
| #include "util/config-file.hpp" |
| |
| #include <thread> |
| |
| #include <json/reader.h> |
| #include <json/value.h> |
| #include <json/writer.h> |
| |
| #include <ndn-cxx/data.hpp> |
| #include <ndn-cxx/face.hpp> |
| #include <ndn-cxx/interest.hpp> |
| #include <ndn-cxx/interest-filter.hpp> |
| #include <ndn-cxx/name.hpp> |
| #include <ndn-cxx/security/key-chain.hpp> |
| #include <ndn-cxx/util/time.hpp> |
| #include <ndn-cxx/encoding/encoding-buffer.hpp> |
| #include <ndn-cxx/util/in-memory-storage-lru.hpp> |
| #include <ndn-cxx/util/string-helper.hpp> |
| #include <ChronoSync/socket.hpp> |
| |
| #include "mysql/mysql.h" |
| |
| #include <map> |
| #include <unordered_map> |
| #include <memory> |
| #include <mutex> |
| #include <sstream> |
| #include <string> |
| #include <array> |
| #include <utility> |
| |
| #include "util/logger.hpp" |
| |
| |
| namespace atmos { |
| namespace query { |
| #ifdef HAVE_LOG4CXX |
| INIT_LOGGER("QueryAdapter"); |
| #endif |
| |
| // todo: calculate payload limit by get the size of a signed empty Data packet |
| static const size_t PAYLOAD_LIMIT = 7000; |
| |
| /** |
| * QueryAdapter handles the Query usecases for the catalog |
| */ |
| template <typename DatabaseHandler> |
| class QueryAdapter : public atmos::util::CatalogAdapter { |
| public: |
| /** |
| * Constructor |
| * |
| * @param face: Face that will be used for NDN communications |
| * @param keyChain: KeyChain that will be used for data signing |
| * @param syncSocket: ChronoSync socket |
| */ |
| QueryAdapter(const std::shared_ptr<ndn::Face>& face, |
| const std::shared_ptr<ndn::KeyChain>& keyChain, |
| const std::shared_ptr<chronosync::Socket>& syncSocket); |
| |
| virtual |
| ~QueryAdapter(); |
| |
| /** |
| * Helper function to specify section handler |
| */ |
| void |
| setConfigFile(util::ConfigFile& config, |
| const ndn::Name& prefix, |
| const std::vector<std::string>& nameFields, |
| const std::string& databaseTable); |
| |
| protected: |
| /** |
| * Helper function for configuration parsing |
| */ |
| void |
| onConfig(const util::ConfigSection& section, |
| bool isDryDun, |
| const std::string& fileName, |
| const ndn::Name& prefix); |
| |
| /** |
| * Handles incoming query requests by stripping the filter off the Interest to get the |
| * actual request out. This removes the need for a 2-step Interest-Data retrieval. |
| * |
| * @param filter: InterestFilter that caused this Interest to be routed |
| * @param interest: Interest that needs to be handled |
| */ |
| virtual void |
| onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest); |
| |
| /** |
| * Handles requests for responses to an filter initialization request |
| * |
| * @param interest: Interest that needs to be handled |
| */ |
| virtual void |
| onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest); |
| |
| /** |
| * Helper function that generates query results from a Json query carried in the Interest |
| * |
| * @param interest: Interest that needs to be handled |
| */ |
| void |
| populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest); |
| |
| void |
| getFiltersMenu(Json::Value& value); |
| |
| /** |
| * Helper function that makes query-results data |
| * |
| * @param segmentPrefix: Name that identifies the Prefix for the Data |
| * @param value: Json::Value to be sent in the Data |
| * @param segmentNo: uint64_t the segment for this Data |
| * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the |
| * last entry |
| * @param isAutocomplete: bool to indicate whether this is an autocomplete message |
| * @param resultCount: the number of records in the query results |
| * @param viewStart: the start index of the record in the query results payload |
| * @param viewEnd: the end index of the record in the query results payload |
| * @param lastComponent: flag to indicate the content contains the last component for |
| autocompletion query |
| */ |
| std::shared_ptr<ndn::Data> |
| makeReplyData(const ndn::Name& segmentPrefix, |
| const Json::Value& value, |
| uint64_t segmentNo, |
| bool isFinalBlock, |
| bool isAutocomplete, |
| uint64_t resultCount, |
| uint64_t viewStart, |
| uint64_t viewEnd, |
| bool lastComponent); |
| |
| /** |
| * Helper function that generates query results from a Json query carried in the Interest |
| * |
| * @param interest: Interest that needs to be handled |
| */ |
| void |
| runJsonQuery(std::shared_ptr<const ndn::Interest> interest); |
| |
| /** |
| * Helper function that makes ACK data |
| * |
| * @param interest: Intersts that needs to be handled |
| * @param version: Version that needs to be in the data name |
| */ |
| std::shared_ptr<ndn::Data> |
| makeAckData(std::shared_ptr<const ndn::Interest> interest, |
| const ndn::Name::Component& version); |
| |
| /** |
| * Helper function that sends NACK |
| * |
| * @param dataPrefix: prefix for the data packet |
| */ |
| void |
| sendNack(const ndn::Name& dataPrefix); |
| |
| /** |
| * Helper function that signs the data |
| */ |
| void |
| signData(ndn::Data& data); |
| |
| /** |
| * Helper function that publishes query-results data segments |
| */ |
| virtual void |
| prepareSegmentsBySqlString(const ndn::Name& segmentPrefix, |
| const std::string& sqlString, |
| bool lastComponent, |
| const std::string& nameField); |
| |
| virtual void |
| prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams, |
| const ndn::Name& segmentPrefix); |
| |
| void |
| generateSegments(ResultSet_T& res, |
| const ndn::Name& segmentPrefix, |
| int resultCount, |
| bool autocomplete, |
| bool lastComponent); |
| |
| /** |
| * Helper function to set the DatabaseHandler |
| */ |
| void |
| setDatabaseHandler(const util::ConnectionDetails& databaseId); |
| |
| void |
| closeDatabaseHandler(); |
| |
| /** |
| * Helper function that set filters to make the adapter work |
| */ |
| void |
| setFilters(); |
| |
| void |
| setCatalogId(); |
| |
| /** |
| * Helper function that generates the sqlQuery string for autocomplete query |
| * @param sqlQuery: stringstream to save the sqlQuery string |
| * @param jsonValue: Json value that contains the query information |
| * @param lastComponent: Flag to mark the last component query |
| * @param nameField: stringstream to save the nameField string |
| */ |
| bool |
| json2AutocompletionSql(std::stringstream& sqlQuery, |
| Json::Value& jsonValue, |
| bool& lastComponent, |
| std::stringstream& nameField); |
| |
| bool |
| doPrefixBasedSearch(Json::Value& jsonValue, |
| std::vector<std::pair<std::string, std::string>>& typedComponents); |
| |
| bool |
| doFilterBasedSearch(Json::Value& jsonValue, |
| std::vector<std::pair<std::string, std::string>>& typedComponents); |
| |
| ndn::Name |
| getQueryResultsName(std::shared_ptr<const ndn::Interest> interest, |
| const ndn::Name::Component& version); |
| |
| std::string |
| getChronoSyncDigest(); |
| |
| protected: |
| typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList; |
| // Handle to the Catalog's database |
| std::shared_ptr<DatabaseHandler> m_dbConnPool; |
| const std::shared_ptr<chronosync::Socket>& m_socket; |
| |
| // mutex to control critical sections |
| std::mutex m_mutex; |
| // @{ needs m_mutex protection |
| // The Queries we are currently writing to |
| ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse; |
| ndn::util::InMemoryStorageLru m_cache; |
| std::string m_chronosyncDigest; |
| // @} |
| RegisteredPrefixList m_registeredPrefixList; |
| ndn::Name m_catalogId; // should be replaced with the PK digest |
| std::vector<std::string> m_filterCategoryNames; |
| }; |
| |
| template <typename DatabaseHandler> |
| QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face, |
| const std::shared_ptr<ndn::KeyChain>& keyChain, |
| const std::shared_ptr<chronosync::Socket>& syncSocket) |
| : util::CatalogAdapter(face, keyChain) |
| , m_socket(syncSocket) |
| , m_activeQueryToFirstResponse(100000) |
| , m_cache(250000) |
| , m_chronosyncDigest("0") |
| , m_catalogId("catalogIdPlaceHolder") // initialize for unitests |
| { |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::setFilters() |
| { |
| m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix), |
| bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest, |
| this, _1, _2), |
| bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess, |
| this, _1), |
| bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure, |
| this, _1, _2)); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config, |
| const ndn::Name& prefix, |
| const std::vector<std::string>& nameFields, |
| const std::string& databaseTable) |
| { |
| m_nameFields = nameFields; |
| m_databaseTable = databaseTable; |
| config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this, |
| _1, _2, _3, prefix)); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section, |
| bool isDryRun, |
| const std::string& filename, |
| const ndn::Name& prefix) |
| { |
| using namespace util; |
| if (isDryRun) { |
| return; |
| } |
| std::string signingId, dbServer, dbName, dbUser, dbPasswd; |
| for (auto item = section.begin(); |
| item != section.end(); |
| ++item) |
| { |
| if (item->first == "signingId") { |
| signingId = item->second.get_value<std::string>(); |
| if (signingId.empty()) { |
| throw Error("Empty value for \"signingId\"" |
| " in \"query\" section"); |
| } |
| } |
| if (item->first == "filterCategoryNames") { |
| std::istringstream ss(item->second.get_value<std::string>()); |
| std::string token; |
| while(std::getline(ss, token, ',')) { |
| m_filterCategoryNames.push_back(token); |
| } |
| } |
| if (item->first == "database") { |
| const util::ConfigSection& dataSection = item->second; |
| for (auto subItem = dataSection.begin(); |
| subItem != dataSection.end(); |
| ++subItem) |
| { |
| if (subItem->first == "dbServer") { |
| dbServer = subItem->second.get_value<std::string>(); |
| } |
| if (subItem->first == "dbName") { |
| dbName = subItem->second.get_value<std::string>(); |
| } |
| if (subItem->first == "dbUser") { |
| dbUser = subItem->second.get_value<std::string>(); |
| } |
| if (subItem->first == "dbPasswd") { |
| dbPasswd = subItem->second.get_value<std::string>(); |
| } |
| } |
| |
| if (dbServer.empty()){ |
| throw Error("Invalid value for \"dbServer\"" |
| " in \"query\" section"); |
| } |
| if (dbName.empty()){ |
| throw Error("Invalid value for \"dbName\"" |
| " in \"query\" section"); |
| } |
| if (dbUser.empty()){ |
| throw Error("Invalid value for \"dbUser\"" |
| " in \"query\" section"); |
| } |
| if (dbPasswd.empty()){ |
| throw Error("Invalid value for \"dbPasswd\"" |
| " in \"query\" section"); |
| } |
| } |
| } |
| |
| if (m_filterCategoryNames.empty()) { |
| throw Error("Empty value for \"filterCategoryNames\" in \"query\" section"); |
| } |
| |
| m_prefix = prefix; |
| |
| m_signingId = ndn::Name(signingId); |
| setCatalogId(); |
| |
| util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName); |
| setDatabaseHandler(mysqlId); |
| setFilters(); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::setCatalogId() |
| { |
| //empty |
| } |
| |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>::setCatalogId() |
| { |
| // use public key digest as the catalog ID |
| ndn::Name keyId; |
| if (m_signingId.empty()) { |
| keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity()); |
| } else { |
| keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId); |
| } |
| |
| std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId); |
| ndn::Block keyDigest = pKey->computeDigest(); |
| m_catalogId.clear(); |
| m_catalogId.append(ndn::toHex(*keyDigest.getBuffer())); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId) |
| { |
| //empty |
| } |
| |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>::setDatabaseHandler(const util::ConnectionDetails& databaseId) |
| { |
| m_dbConnPool = zdbConnectionSetup(databaseId); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::closeDatabaseHandler() |
| { |
| } |
| |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>::closeDatabaseHandler() |
| { |
| ConnectionPool_stop(*m_dbConnPool); |
| } |
| |
| |
| template <typename DatabaseHandler> |
| QueryAdapter<DatabaseHandler>::~QueryAdapter() |
| { |
| for (const auto& itr : m_registeredPrefixList) { |
| if (static_cast<bool>(itr.second)) |
| m_face->unsetInterestFilter(itr.second); |
| } |
| |
| closeDatabaseHandler(); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter, |
| const ndn::Interest& interest) |
| { |
| _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest"); |
| |
| // Interest must carry component "initialization" or "query" |
| if (interest.getName().size() < filter.getPrefix().size()) { |
| // must NACK incorrect interest |
| sendNack(interest.getName()); |
| return; |
| } |
| |
| _LOG_DEBUG("Interest : " << interest.getName()); |
| std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this(); |
| |
| if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) { |
| std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest, |
| this, |
| interestPtr); |
| queryThread.detach(); |
| } |
| else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) { |
| |
| auto data = m_cache.find(interest); |
| if (data) { |
| m_face->put(*data); |
| return; |
| } |
| |
| // catalog must strip sequence number in an Interest for further process |
| if (interest.getName().size() > (filter.getPrefix().size() + 2)) { |
| // Interest carries sequence number, only grip the main part |
| // e.g., /hep/query/<query-params>/<version>/#seq |
| ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2)); |
| |
| auto data = m_cache.find(queryInterest); |
| if (data) { |
| // catalog has generated some data, but still working on it |
| return; |
| } |
| interestPtr = queryInterest.shared_from_this(); |
| } |
| |
| std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery, |
| this, |
| interestPtr); |
| queryThread.detach(); |
| } |
| |
| // ignore other Interests |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest) |
| { |
| _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest"); |
| |
| if(m_socket != nullptr) { |
| const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest(); |
| std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size()); |
| _LOG_DEBUG("Original digest :" << m_chronosyncDigest); |
| _LOG_DEBUG("New digest : " << digestStr); |
| // if the m_chronosyncDigest and the rootdigest are not equal |
| if (digestStr != m_chronosyncDigest) { |
| // (1) update chronosyncDigest |
| // (2) clear all staled ACK data |
| m_mutex.lock(); |
| m_chronosyncDigest = digestStr; |
| m_activeQueryToFirstResponse.erase(ndn::Name("/")); |
| m_mutex.unlock(); |
| _LOG_DEBUG("Change digest to " << m_chronosyncDigest); |
| } |
| } |
| |
| auto data = m_activeQueryToFirstResponse.find(*interest); |
| if (data) { |
| m_face->put(*data); |
| } |
| else { |
| populateFiltersMenu(interest); |
| } |
| |
| _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest"); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest) |
| { |
| _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu"); |
| Json::Value filters; |
| Json::FastWriter fastWriter; |
| getFiltersMenu(filters); |
| |
| const std::string filterValue = fastWriter.write(filters); |
| |
| if (!filters.empty()) { |
| // use /<prefix>/filters-initialization/<seg> as data name |
| ndn::Name filterDataName(interest->getName().getPrefix(-1)); |
| |
| const char* payload = filterValue.c_str(); |
| size_t payloadLength = filterValue.size(); |
| size_t startIndex = 0, seqNo = 0; |
| |
| if (filterValue.length() > PAYLOAD_LIMIT) { |
| payloadLength = PAYLOAD_LIMIT; |
| ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo); |
| std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName); |
| // freshnessPeriod 0 means permanent? |
| filterData->setFreshnessPeriod(ndn::time::milliseconds(10)); |
| filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength); |
| |
| signData(*filterData); |
| |
| _LOG_DEBUG("Populate Filter Data :" << segmentName); |
| |
| m_mutex.lock(); |
| // save the filter results in the activeQueryToFirstResponse structure |
| // when version changes, the activeQueryToFirstResponse should be cleaned |
| m_activeQueryToFirstResponse.insert(*filterData); |
| try { |
| m_face->put(*filterData); |
| } |
| catch (std::exception& e) { |
| _LOG_ERROR(e.what()); |
| } |
| m_mutex.unlock(); |
| |
| seqNo++; |
| startIndex = payloadLength * seqNo + 1; |
| } |
| payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo; |
| |
| ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo); |
| std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment); |
| filterData->setFreshnessPeriod(ndn::time::milliseconds(10)); |
| filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength); |
| filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo)); |
| |
| signData(*filterData); |
| m_mutex.lock(); |
| m_activeQueryToFirstResponse.insert(*filterData); |
| m_face->put(*filterData); |
| m_mutex.unlock(); |
| } |
| _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu"); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value) |
| { |
| // empty |
| } |
| |
| // get distinct value of each column |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>::getFiltersMenu(Json::Value& value) |
| { |
| _LOG_DEBUG(">> QueryAdapter::getFiltersMenu"); |
| Json::Value tmp; |
| |
| Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool); |
| if (!conn) { |
| _LOG_DEBUG("No available database connections"); |
| return; |
| } |
| |
| for (size_t i = 0; i < m_filterCategoryNames.size(); i++) { |
| std::string columnName = m_filterCategoryNames[i]; |
| std::string getFilterSql("SELECT DISTINCT " + columnName + |
| " FROM " + m_databaseTable + ";"); |
| |
| ResultSet_T res4ColumnName; |
| TRY { |
| res4ColumnName = Connection_executeQuery(conn, reinterpret_cast<const char*>(getFilterSql.c_str()), getFilterSql.size()); |
| } |
| CATCH(SQLException) { |
| _LOG_ERROR(Connection_getLastError(conn)); |
| } |
| END_TRY; |
| |
| while (ResultSet_next(res4ColumnName)) { |
| tmp[columnName].append(ResultSet_getString(res4ColumnName, 1)); |
| } |
| |
| value.append(tmp); |
| tmp.clear(); |
| } |
| |
| _LOG_DEBUG("<< QueryAdapter::getFiltersMenu"); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::signData(ndn::Data& data) |
| { |
| if (m_signingId.empty()) |
| m_keyChain->sign(data); |
| else { |
| ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId); |
| ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName); |
| m_keyChain->sign(data, certName); |
| } |
| } |
| |
| template <typename DatabaseHandler> |
| ndn::Name |
| QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest, |
| const ndn::Name::Component& version) |
| { |
| // use generic name, instead of specific one |
| ndn::Name queryResultName = interest->getName(); |
| queryResultName.append(version); |
| return queryResultName; |
| } |
| |
| template <typename DatabaseHandler> |
| std::shared_ptr<ndn::Data> |
| QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest, |
| const ndn::Name::Component& version) |
| { |
| std::string queryResultNameStr(getQueryResultsName(interest, version).toUri()); |
| |
| std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName()); |
| ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()), |
| queryResultNameStr.length()); |
| ack->setFreshnessPeriod(ndn::time::milliseconds(10000)); |
| |
| signData(*ack); |
| |
| _LOG_DEBUG("Make ACK : " << queryResultNameStr); |
| |
| return ack; |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix) |
| { |
| uint64_t segmentNo = 0; |
| |
| std::shared_ptr<ndn::Data> nack = |
| std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo)); |
| nack->setFreshnessPeriod(ndn::time::milliseconds(10000)); |
| nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo)); |
| |
| signData(*nack); |
| |
| _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo)); |
| |
| m_mutex.lock(); |
| m_cache.insert(*nack); |
| m_face->put(*nack); |
| m_mutex.unlock(); |
| } |
| |
| template <typename DatabaseHandler> |
| bool |
| QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery, |
| Json::Value& jsonValue, |
| bool& lastComponent, |
| std::stringstream& fieldName) |
| { |
| _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql"); |
| |
| _LOG_DEBUG(jsonValue.toStyledString()); |
| |
| if (jsonValue.type() != Json::objectValue) { |
| return false; |
| } |
| |
| std::string typedString; |
| // get the string in the jsonValue |
| for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter) |
| { |
| Json::Value key = iter.key(); |
| Json::Value value = (*iter); |
| |
| if (key == Json::nullValue || value == Json::nullValue) { |
| _LOG_ERROR("Null key or value in JsonValue"); |
| return false; |
| } |
| |
| // cannot convert to string |
| if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) { |
| _LOG_ERROR("Malformed JsonQuery string"); |
| return false; |
| } |
| |
| if (key.asString().compare("?") == 0) { |
| typedString = value.asString(); |
| // since the front end triggers the autocompletion when users typed '/', |
| // there must be a '/' at the end, and the first char must be '/' |
| if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0) |
| return false; |
| break; |
| } |
| } |
| |
| // 1. get the expected column number by parsing the typedString, so we can get the filed name |
| size_t pos = 0; |
| size_t start = 1; // start from the 1st char which is not '/' |
| size_t count = 0; // also the name to query for |
| std::string token; |
| std::string delimiter = "/"; |
| std::map<std::string, std::string> typedComponents; |
| while ((pos = typedString.find(delimiter, start)) != std::string::npos) { |
| token = typedString.substr(start, pos - start); |
| if (count >= m_nameFields.size() - 1) { |
| return false; |
| } |
| |
| // add column name and value (token) into map |
| typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token)); |
| count++; |
| start = pos + 1; |
| } |
| |
| // 2. generate the sql string (append what appears in the typed string, like activity='xxx'), |
| // return true |
| if (count == m_nameFields.size() - 1) |
| lastComponent = true; // indicate this query is to query the last component |
| |
| bool more = false; |
| |
| fieldName << m_nameFields[count]; |
| for (std::map<std::string, std::string>::iterator it = typedComponents.begin(); |
| it != typedComponents.end(); ++it) { |
| if (more) |
| sqlQuery << " AND"; |
| else |
| sqlQuery << " WHERE"; |
| |
| sqlQuery << " " << it->first << "='" << it->second << "'"; |
| |
| more = true; |
| } |
| sqlQuery << ";"; |
| return true; |
| } |
| |
| template <typename databasehandler> |
| bool |
| QueryAdapter<databasehandler>::doPrefixBasedSearch(Json::Value& jsonValue, |
| std::vector<std::pair<std::string, std::string>>& typedComponents) |
| { |
| _LOG_DEBUG(">> QueryAdapter::doPrefixBasedSearch"); |
| |
| if (jsonValue.type() != Json::objectValue) { |
| return false; |
| } |
| |
| std::string typedString; |
| // get the string in the jsonValue |
| for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter) |
| { |
| Json::Value key = iter.key(); |
| Json::Value value = (*iter); |
| |
| if (key == Json::nullValue || value == Json::nullValue) { |
| _LOG_ERROR("null key or value in jsonValue"); |
| return false; |
| } |
| |
| // cannot convert to string |
| if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) { |
| _LOG_ERROR("malformed jsonquery string"); |
| return false; |
| } |
| |
| if (key.asString().compare("??") == 0) { |
| typedString = value.asString(); |
| if (typedString.empty() || typedString.find("/") != 0) |
| return false; |
| break; |
| } |
| } |
| |
| // 1. get the expected column number by parsing the typedString, so we can get the filed name |
| size_t pos = 0; |
| size_t start = 1; // start from the 1st char which is not '/' |
| size_t count = 0; // also the name to query for |
| size_t typedStringlen = typedString.length(); |
| std::string token; |
| std::string delimiter = "/"; |
| |
| while ((pos = typedString.find(delimiter, start)) != std::string::npos) { |
| token = typedString.substr(start, pos - start); |
| if (count >= m_nameFields.size()) { |
| return false; |
| } |
| |
| // add column name and value (token) into map |
| typedComponents.push_back(std::make_pair(m_nameFields[count], token)); |
| |
| count++; |
| start = pos + 1; |
| } |
| |
| // we may have a component after the last "/" |
| if (start < typedStringlen) { |
| typedComponents.push_back(std::make_pair(m_nameFields[count], |
| typedString.substr(start, typedStringlen - start))); |
| } |
| |
| return true; |
| } |
| |
| |
| template <typename databasehandler> |
| bool |
| QueryAdapter<databasehandler>::doFilterBasedSearch(Json::Value& jsonValue, |
| std::vector<std::pair<std::string, std::string>>& typedComponents) |
| { |
| _LOG_DEBUG(">> QueryAdapter::doFilterBasedSearch"); |
| |
| if (jsonValue.type() != Json::objectValue) { |
| return false; |
| } |
| |
| for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter) |
| { |
| Json::Value key = iter.key(); |
| Json::Value value = (*iter); |
| |
| if (key == Json::nullValue || value == Json::nullValue) { |
| _LOG_ERROR("null key or value in jsonValue"); |
| return false; |
| } |
| |
| // cannot convert to string |
| if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) { |
| _LOG_ERROR("malformed jsonQuery string"); |
| return false; |
| } |
| |
| if (key.asString().compare("?") == 0 || key.asString().compare("??") == 0) { |
| continue; |
| } |
| |
| _LOG_DEBUG(key.asString() << " " << value.asString()); |
| typedComponents.push_back(std::make_pair(key.asString(), value.asString())); |
| } |
| |
| return true; |
| } |
| |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest) |
| { |
| _LOG_DEBUG(">> QueryAdapter::runJsonQuery"); |
| |
| // 1) Strip the prefix off the ndn::Interest's ndn::Name |
| // +1 to grab JSON component after "query" component |
| |
| ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1]; |
| // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery |
| const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size()); |
| |
| if (jsonQuery.length() <= 0) { |
| // no JSON query, send Nack |
| sendNack(interest->getName()); |
| return; |
| } |
| |
| // the version should be replaced with ChronoSync state digest |
| ndn::name::Component version; |
| |
| if(m_socket != nullptr) { |
| const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest(); |
| std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size()); |
| _LOG_DEBUG("Original digest " << m_chronosyncDigest); |
| _LOG_DEBUG("New digest : " << digestStr); |
| // if the m_chronosyncDigest and the rootdigest are not equal |
| if (digestStr != m_chronosyncDigest) { |
| // (1) update chronosyncDigest |
| // (2) clear all staled ACK data |
| m_mutex.lock(); |
| m_chronosyncDigest = digestStr; |
| m_activeQueryToFirstResponse.erase(ndn::Name("/")); |
| m_mutex.unlock(); |
| _LOG_DEBUG("Change digest to " << m_chronosyncDigest); |
| } |
| version = ndn::name::Component::fromEscapedString(digestStr); |
| } |
| else { |
| version = ndn::name::Component::fromEscapedString(m_chronosyncDigest); |
| } |
| |
| // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out |
| Json::Value parsedFromString; |
| Json::Reader reader; |
| if (!reader.parse(jsonQuery, parsedFromString)) { |
| // json object is broken |
| sendNack(interest->getName()); |
| _LOG_ERROR("Cannot parse the JsonQuery"); |
| return; |
| } |
| |
| // 3) Convert the JSON Query into a MySQL one |
| ndn::Name segmentPrefix(getQueryResultsName(interest, version)); |
| _LOG_DEBUG("segmentPrefix :" << segmentPrefix); |
| |
| Json::Value tmp; |
| std::vector<std::pair<std::string, std::string>> typedComponents; |
| |
| // expect the autocomplete and the component-based query are separate |
| // if Json::Value contains ? as key, is autocompletion |
| if (parsedFromString.get("?", tmp) != tmp) { |
| bool lastComponent = false; |
| std::stringstream sqlQuery, fieldName; |
| |
| // must generate the sql string for autocomple, the selected column is changing |
| if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent, fieldName)) { |
| sendNack(segmentPrefix); |
| return; |
| } |
| prepareSegmentsBySqlString(segmentPrefix, sqlQuery.str(), lastComponent, fieldName.str()); |
| } |
| else if (parsedFromString.get("??", tmp) != tmp) { |
| if (!doPrefixBasedSearch(parsedFromString, typedComponents)) { |
| sendNack(segmentPrefix); |
| return; |
| } |
| prepareSegmentsByParams(typedComponents, segmentPrefix); |
| } |
| else { |
| if (!doFilterBasedSearch(parsedFromString, typedComponents)) { |
| sendNack(segmentPrefix); |
| return; |
| } |
| prepareSegmentsByParams(typedComponents, segmentPrefix); |
| } |
| |
| } |
| |
| template <typename databasehandler> |
| void |
| QueryAdapter<databasehandler>:: |
| prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams, |
| const ndn::Name& segmentprefix) |
| { |
| } |
| |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>:: |
| prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams, |
| const ndn::Name& segmentPrefix) |
| { |
| _LOG_DEBUG(">> QueryAdapter::prepareSegmentsByParams"); |
| |
| // the prepared_statement cannot improve the performance, but can simplify the code |
| Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool); |
| if (!conn) { |
| // do not answer for this request due to lack of connections, request will come back later |
| _LOG_DEBUG("No available database connections"); |
| return; |
| } |
| std::string getRecordNumSqlStr("SELECT count(name) FROM "); |
| getRecordNumSqlStr += m_databaseTable; |
| getRecordNumSqlStr += " WHERE "; |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| getRecordNumSqlStr += m_nameFields[i]; |
| getRecordNumSqlStr += " LIKE ?"; |
| if (i != m_nameFields.size() - 1) { |
| getRecordNumSqlStr += " AND "; |
| } |
| } |
| |
| PreparedStatement_T ps4RecordNum = |
| Connection_prepareStatement(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size()); |
| |
| // before query, initialize all params for statement |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| PreparedStatement_setString(ps4RecordNum, i + 1, "%"); |
| } |
| |
| // reset params based on the query |
| for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin(); |
| it != queryParams.end(); ++it) { |
| // dictionary is faster |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| if (it->first == m_nameFields[i]) { |
| PreparedStatement_setString(ps4RecordNum, i + 1, it->second.c_str()); |
| } |
| } |
| } |
| |
| ResultSet_T res4RecordNum; |
| TRY { |
| res4RecordNum = PreparedStatement_executeQuery(ps4RecordNum); |
| } |
| CATCH(SQLException) { |
| _LOG_ERROR(Connection_getLastError(conn)); |
| } |
| END_TRY; |
| |
| uint64_t resultCount = 0; // use count sql to get |
| |
| // result for record number |
| while (ResultSet_next(res4RecordNum)) { |
| resultCount = ResultSet_getInt(res4RecordNum, 1); |
| } |
| |
| // get name list statement |
| std::string getNameListSqlStr("SELECT name FROM "); |
| getNameListSqlStr += m_databaseTable; |
| getNameListSqlStr += " WHERE "; |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| getNameListSqlStr += m_nameFields[i]; |
| getNameListSqlStr += " LIKE ?"; |
| if (i != m_nameFields.size() - 1) { |
| getNameListSqlStr += " AND "; |
| } |
| } |
| |
| PreparedStatement_T ps4Name = |
| Connection_prepareStatement(conn, reinterpret_cast<const char*>(getNameListSqlStr.c_str()), getNameListSqlStr.size()); |
| |
| // before query, initialize all params for statement |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| PreparedStatement_setString(ps4Name, i + 1, "%"); |
| } |
| |
| // reset params based on the query |
| for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin(); |
| it != queryParams.end(); ++it) { |
| // dictionary is faster |
| for (size_t i = 0; i < m_nameFields.size(); i++) { |
| if (it->first == m_nameFields[i]) { |
| PreparedStatement_setString(ps4Name, i + 1, it->second.c_str()); |
| } |
| } |
| } |
| |
| ResultSet_T res4Name; |
| TRY { |
| res4Name = PreparedStatement_executeQuery(ps4Name); |
| } |
| CATCH(SQLException) { |
| _LOG_ERROR(Connection_getLastError(conn)); |
| } |
| END_TRY; |
| |
| generateSegments(res4Name, segmentPrefix, resultCount, false, false); |
| |
| Connection_close(conn); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::generateSegments(ResultSet_T& res, |
| const ndn::Name& segmentPrefix, |
| int resultCount, |
| bool autocomplete, |
| bool lastComponent) |
| { |
| uint64_t segmentno = 0; |
| Json::Value tmp; |
| Json::Value resultjson; |
| Json::FastWriter fastWriter; |
| |
| uint64_t viewstart = 0, viewend = 0; |
| while (ResultSet_next(res)) { |
| const char *name = ResultSet_getString(res, 1); |
| tmp.append(name); |
| const std::string tmpString = fastWriter.write(tmp); |
| if (tmpString.length() > PAYLOAD_LIMIT) { |
| std::shared_ptr<ndn::Data> data |
| = makeReplyData(segmentPrefix, resultjson, segmentno, false, |
| autocomplete, resultCount, viewstart, viewend, lastComponent); |
| m_mutex.lock(); |
| m_cache.insert(*data); |
| m_face->put(*data); |
| m_mutex.unlock(); |
| tmp.clear(); |
| resultjson.clear(); |
| segmentno++; |
| viewstart = viewend + 1; |
| } |
| resultjson.append(name); |
| viewend++; |
| } |
| std::shared_ptr<ndn::Data> data |
| = makeReplyData(segmentPrefix, resultjson, segmentno, true, |
| autocomplete, resultCount, viewstart, viewend, lastComponent); |
| m_mutex.lock(); |
| m_cache.insert(*data); |
| m_face->put(*data); |
| m_mutex.unlock(); |
| } |
| |
| template <typename DatabaseHandler> |
| void |
| QueryAdapter<DatabaseHandler>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix, |
| const std::string& sqlString, |
| bool lastComponent, |
| const std::string& nameField) |
| { |
| // empty |
| } |
| |
| |
| template <> |
| void |
| QueryAdapter<ConnectionPool_T>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix, |
| const std::string& sqlString, |
| bool lastComponent, |
| const std::string& nameField) |
| { |
| _LOG_DEBUG(">> QueryAdapter::prepareSegmentsBySqlString"); |
| |
| _LOG_DEBUG(sqlString); |
| |
| Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool); |
| if (!conn) { |
| _LOG_DEBUG("No available database connections"); |
| return; |
| } |
| |
| //// just for get the rwo count ... |
| std::string getRecordNumSqlStr("SELECT COUNT( DISTINCT "); |
| getRecordNumSqlStr += nameField; |
| getRecordNumSqlStr += ") FROM "; |
| getRecordNumSqlStr += m_databaseTable; |
| getRecordNumSqlStr += sqlString; |
| |
| ResultSet_T res4RecordNum; |
| TRY { |
| res4RecordNum = Connection_executeQuery(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size()); |
| } |
| CATCH(SQLException) { |
| _LOG_ERROR(Connection_getLastError(conn)); |
| } |
| END_TRY; |
| |
| uint64_t resultCount = 0; |
| while (ResultSet_next(res4RecordNum)) { |
| resultCount = ResultSet_getInt(res4RecordNum, 1); |
| } |
| //// |
| |
| std::string getNextFieldsSqlStr("SELECT DISTINCT "); |
| getNextFieldsSqlStr += nameField; |
| getNextFieldsSqlStr += " FROM "; |
| getNextFieldsSqlStr += m_databaseTable; |
| getNextFieldsSqlStr += sqlString; |
| |
| ResultSet_T res4NextFields; |
| TRY { |
| res4NextFields = Connection_executeQuery(conn, reinterpret_cast<const char*>(getNextFieldsSqlStr.c_str()), getNextFieldsSqlStr.size()); |
| } |
| CATCH(SQLException) { |
| _LOG_ERROR(Connection_getLastError(conn)); |
| } |
| END_TRY; |
| |
| generateSegments(res4NextFields, segmentPrefix, resultCount, true, lastComponent); |
| |
| Connection_close(conn); |
| } |
| |
| template <typename DatabaseHandler> |
| std::shared_ptr<ndn::Data> |
| QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix, |
| const Json::Value& value, |
| uint64_t segmentNo, |
| bool isFinalBlock, |
| bool isAutocomplete, |
| uint64_t resultCount, |
| uint64_t viewStart, |
| uint64_t viewEnd, |
| bool lastComponent) |
| { |
| Json::Value entry; |
| Json::FastWriter fastWriter; |
| |
| entry["resultCount"] = Json::UInt64(resultCount);; |
| entry["viewStart"] = Json::UInt64(viewStart); |
| entry["viewEnd"] = Json::UInt64(viewEnd); |
| |
| if (lastComponent) |
| entry["lastComponent"] = Json::Value(true); |
| |
| _LOG_DEBUG("resultCount " << resultCount << "; " |
| << "viewStart " << viewStart << "; " |
| << "viewEnd " << viewEnd); |
| |
| if (isAutocomplete) { |
| entry["next"] = value; |
| } else { |
| entry["results"] = value; |
| } |
| const std::string jsonMessage = fastWriter.write(entry); |
| const char* payload = jsonMessage.c_str(); |
| size_t payloadLength = jsonMessage.size() + 1; |
| ndn::Name segmentName(segmentPrefix); |
| segmentName.appendSegment(segmentNo); |
| |
| std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName); |
| data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength); |
| data->setFreshnessPeriod(ndn::time::milliseconds(10000)); |
| |
| if (isFinalBlock) { |
| data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo)); |
| } |
| |
| _LOG_DEBUG(segmentName); |
| |
| signData(*data); |
| return data; |
| } |
| |
| |
| } // namespace query |
| } // namespace atmos |
| #endif //ATMOS_QUERY_QUERY_ADAPTER_HPP |