populate filters menu
* add view index in payload
* fix bug that catalog crashes upon receiving request for empty prefix
* move component <catalogID> before <query-param>
refs: #3120
Change-Id: I468a72b6b2d1889b9c73052a0226dfeaa851179f
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ccfefb6..c6391ef 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -38,6 +38,7 @@
#include <ndn-cxx/util/time.hpp>
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/util/in-memory-storage-lru.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
#include "mysql/mysql.h"
@@ -78,7 +79,8 @@
void
setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields);
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable);
protected:
/**
@@ -110,6 +112,26 @@
onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
+ * Handles requests for responses to an filter initialization request
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+ /**
+ * Helper function that generates query results from a Json query carried in the Interest
+ *
+ * @param interest: Interest that needs to be handled
+ */
+ void
+ populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
+
+ void
+ getFiltersMenu(Json::Value& value);
+
+ /**
* Helper function that makes query-results data
*
* @param segmentPrefix: Name that identifies the Prefix for the Data
@@ -119,6 +141,8 @@
* last entry
* @param isAutocomplete: bool to indicate whether this is an autocomplete message
* @param resultCount: the number of records in the query results
+ * @param viewStart: the start index of the record in the query results payload
+ * @param viewEnd: the end index of the record in the query results payload
*/
std::shared_ptr<ndn::Data>
makeReplyData(const ndn::Name& segmentPrefix,
@@ -126,7 +150,9 @@
uint64_t segmentNo,
bool isFinalBlock,
bool isAutocomplete,
- uint64_t resultCount);
+ uint64_t resultCount,
+ uint64_t viewStart,
+ uint64_t viewEnd);
/**
* Helper function that generates query results from a Json query carried in the Interest
@@ -189,6 +215,9 @@
void
setFilters();
+ void
+ setCatalogId();
+
/**
* Helper function that generates the sqlQuery string for autocomplete query
* @param sqlQuery: stringstream to save the sqlQuery string
@@ -200,7 +229,11 @@
bool
json2CompleteSearchSql(std::stringstream& sqlQuery,
- Json::Value& jsonValue);
+ Json::Value& jsonValue);
+
+ ndn::Name
+ getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version);
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
@@ -218,6 +251,7 @@
RegisteredPrefixList m_registeredPrefixList;
//std::vector<std::string> m_atmosColumns;
ndn::Name m_catalogId; // should be replaced with the PK digest
+ std::vector<std::string> m_filterCategoryNames;
};
template <typename DatabaseHandler>
@@ -225,7 +259,7 @@
const std::shared_ptr<ndn::KeyChain>& keyChain)
: util::CatalogAdapter(face, keyChain)
, m_cache(250000)
- , m_catalogId("catalogIdPlaceHolder")
+ , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
{
}
@@ -242,23 +276,37 @@
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
- ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
- m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
+ ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
+ m_registeredPrefixList[queryResultsPrefix] =
+ m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
+ .append("query-results").append(m_catalogId)),
bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
this, _1, _2),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
this, _1),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
+
+ ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
+ m_registeredPrefixList[filtersInitializationPrefix] =
+ m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
+ bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ this, _1, _2),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
}
template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields)
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable)
{
m_nameFields = nameFields;
+ m_databaseTable = databaseTable;
config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
}
@@ -277,44 +325,51 @@
std::string signingId, dbServer, dbName, dbUser, dbPasswd;
for (auto item = section.begin();
item != section.end();
- ++ item)
+ ++item)
{
if (item->first == "signingId") {
- signingId.assign(item->second.get_value<std::string>());
+ signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Empty value for \"signingId\""
" in \"query\" section");
}
}
+ if (item->first == "filterCategoryNames") {
+ std::istringstream ss(item->second.get_value<std::string>());
+ std::string token;
+ while(std::getline(ss, token, ',')) {
+ m_filterCategoryNames.push_back(token);
+ }
+ }
if (item->first == "database") {
const util::ConfigSection& dataSection = item->second;
for (auto subItem = dataSection.begin();
subItem != dataSection.end();
- ++ subItem)
+ ++subItem)
{
if (subItem->first == "dbServer") {
- dbServer.assign(subItem->second.get_value<std::string>());
+ dbServer = subItem->second.get_value<std::string>();
if (dbServer.empty()){
throw Error("Invalid value for \"dbServer\""
" in \"query\" section");
}
}
if (subItem->first == "dbName") {
- dbName.assign(subItem->second.get_value<std::string>());
+ dbName = subItem->second.get_value<std::string>();
if (dbName.empty()){
throw Error("Invalid value for \"dbName\""
" in \"query\" section");
}
}
if (subItem->first == "dbUser") {
- dbUser.assign(subItem->second.get_value<std::string>());
+ dbUser = subItem->second.get_value<std::string>();
if (dbUser.empty()){
throw Error("Invalid value for \"dbUser\""
" in \"query\" section");
}
}
if (subItem->first == "dbPasswd") {
- dbPasswd.assign(subItem->second.get_value<std::string>());
+ dbPasswd = subItem->second.get_value<std::string>();
if (dbPasswd.empty()){
throw Error("Invalid value for \"dbPasswd\""
" in \"query\" section");
@@ -324,16 +379,47 @@
}
}
- m_prefix = prefix;
- m_signingId = ndn::Name(signingId);
- util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+ if (m_filterCategoryNames.size() == 0) {
+ throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
+ }
+ m_prefix = prefix;
+
+ m_signingId = ndn::Name(signingId);
+ setCatalogId();
+
+ util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
setDatabaseHandler(mysqlId);
setFilters();
}
template <typename DatabaseHandler>
void
+QueryAdapter<DatabaseHandler>::setCatalogId()
+{
+ //empty
+}
+
+template <>
+void
+QueryAdapter<MYSQL>::setCatalogId()
+{
+ // use public key digest as the catalog ID
+ ndn::Name keyId;
+ if (m_signingId.empty()) {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
+ } else {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+ }
+
+ std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
+ ndn::Block keyDigest = pKey->computeDigest();
+ m_catalogId.clear();
+ m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
+}
+
+template <typename DatabaseHandler>
+void
QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
{
//empty
@@ -370,7 +456,9 @@
}
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+#ifndef NDEBUG
std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
+#endif
// @todo: use thread pool
std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
@@ -389,7 +477,9 @@
// now. In the future, this should check some form of
// InMemoryStorage.
+#ifndef NDEBUG
std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
+#endif
auto data = m_cache.find(interest.getName());
if (data) {
@@ -399,6 +489,132 @@
template <typename DatabaseHandler>
void
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
+{
+ std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+
+#ifndef NDEBUG
+ std::cout << "incoming initialization interest : " << interestPtr->getName() << std::endl;
+#endif
+ // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
+ // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
+
+ auto data = m_cache.find(interest.getName());
+ if (data) {
+ m_face->put(*data);
+ }
+ else {
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
+ this,
+ interestPtr);
+ queryThread.join();
+ }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
+{
+ Json::Value filters;
+ Json::FastWriter fastWriter;
+ getFiltersMenu(filters);
+
+ const std::string filterValue = fastWriter.write(filters);
+
+ if (!filters.empty()) {
+ ndn::Name filterDataName(interest->getName());
+ filterDataName.append("stateVersion");// TODO: should replace with a state version
+
+ const char* payload = filterValue.c_str();
+ size_t payloadLength = filterValue.size();
+ size_t startIndex = 0, seqNo = 0;
+
+ if (filterValue.length() > PAYLOAD_LIMIT) {
+ payloadLength = PAYLOAD_LIMIT;
+ ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
+ std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
+ filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+ filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+
+ signData(*filterData);
+#ifndef NDEBUG
+ std::cout << "populate filter Data : " << segmentName << std::endl;
+#endif
+ m_mutex.lock();
+ m_cache.insert(*filterData);
+ try {
+ m_face->put(*filterData);
+ }// catch exceptions and log
+ catch (std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ m_mutex.unlock();
+
+ seqNo++;
+ startIndex = payloadLength * seqNo + 1;
+ }
+ payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
+
+ ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
+ std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
+ filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+ filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+ filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
+
+ signData(*filterData);
+ m_mutex.lock();
+ m_cache.insert(*filterData);
+ m_face->put(*filterData);
+ m_mutex.unlock();
+ }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
+{
+ // empty
+}
+
+// get distinct value of each column
+template <>
+void
+QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
+{
+ Json::Value tmp;
+
+ for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
+ std::string columnName = m_filterCategoryNames[i];
+ std::string getFilterSql("SELECT DISTINCT " + columnName +
+ " FROM " + m_databaseTable + ";");
+ std::string errMsg;
+ bool success;
+
+ std::shared_ptr<MYSQL_RES> results
+ = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
+ util::QUERY, success, errMsg);
+ if (!success) {
+ std::cout << errMsg << std::endl;
+ value.clear();
+ return;
+ }
+
+ while (MYSQL_ROW row = mysql_fetch_row(results.get()))
+ {
+ tmp[columnName].append(row[0]);
+ }
+ value.append(tmp);
+ tmp.clear();
+ }
+
+#ifndef NDEBUG
+ std::cout << value.toStyledString() << std::endl;
+#endif
+}
+
+template <typename DatabaseHandler>
+void
QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
{
if (m_signingId.empty())
@@ -411,22 +627,36 @@
}
template <typename DatabaseHandler>
+ndn::Name
+QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version)
+{
+ // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
+ // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
+
+ ndn::Name queryResultName(m_prefix);
+ queryResultName.append("query-results")
+ .append(m_catalogId)
+ .append(interest->getName().get(-1))
+ .append(version);
+ return queryResultName;
+}
+
+template <typename DatabaseHandler>
std::shared_ptr<ndn::Data>
QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
const ndn::Name::Component& version)
{
- // JSON parsed ok, so we can acknowledge successful receipt of the query
- ndn::Name ackName(interest->getName());
- ackName.append(version);
- ackName.append(m_catalogId);
- ackName.append("OK");
+ std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
- std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+ std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
+ ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
+ queryResultNameStr.length());
ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
signData(*ack);
#ifndef NDEBUG
- std::cout << "makeAckData : " << ackName << std::endl;
+ std::cout << "qurey-results data name in ACK : " << queryResultNameStr << std::endl;
#endif
return ack;
}
@@ -465,7 +695,7 @@
return false;
}
- sqlQuery << "SELECT name FROM cmip5";
+ sqlQuery << "SELECT name FROM " << m_databaseTable;
bool input = false;
for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
{
@@ -536,7 +766,7 @@
}
if (key.asString().compare("?") == 0) {
- typedString.assign(value.asString());
+ typedString = value.asString();
// since the front end triggers the autocompletion when users typed '/',
// there must be a '/' at the end, and the first char must be '/'
if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
@@ -560,14 +790,14 @@
// add column name and value (token) into map
typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
- count ++;
+ count++;
start = pos + 1;
}
// 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
// return true
bool more = false;
- sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
+ sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
it != typedComponents.end(); ++it) {
if (more)
@@ -615,10 +845,11 @@
}
if (key.asString().compare("??") == 0) {
- typedString.assign(value.asString());
+ typedString = value.asString();
// since the front end triggers the autocompletion when users typed '/',
// there must be a '/' at the end, and the first char must be '/'
- if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+ if (typedString.empty() || typedString.at(typedString.length() - 1) != '/' ||
+ typedString.find("/") != 0)
return false;
break;
}
@@ -639,14 +870,14 @@
// add column name and value (token) into map
typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
- count ++;
+ count++;
start = pos + 1;
}
// 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
// return true
bool more = false;
- sqlQuery << "SELECT name FROM cmip5";
+ sqlQuery << "SELECT name FROM " << m_databaseTable;
for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
it != typedComponents.end(); ++it) {
if (more)
@@ -723,8 +954,8 @@
// This is where things are expensive so we save them for the lock
// note that we ack the query with the cached ACK messages, but we should remove the ACKs
// that conatin the old version when ChronoSync is updated
- m_activeQueryToFirstResponse.insert(std::pair<std::string,
- std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+ //m_activeQueryToFirstResponse.insert(std::pair<std::string,
+ // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
m_face->put(*ack);
} // !!! END CRITICAL SECTION !!!
m_mutex.unlock();
@@ -733,13 +964,7 @@
bool autocomplete = false;
std::stringstream sqlQuery;
- // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
- // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
- ndn::Name segmentPrefix(m_prefix);
- segmentPrefix.append("query-results");
- segmentPrefix.append(jsonStr);
- segmentPrefix.append(version);
- segmentPrefix.append(m_catalogId);
+ ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Json::Value tmp;
// expect the autocomplete and the component-based query are separate
@@ -804,36 +1029,44 @@
uint64_t resultCount = mysql_num_rows(results.get());
+#ifndef NDEBUG
std::cout << "Query results for \""
<< sqlString
<< "\" contain "
<< resultCount
<< " rows" << std::endl;
+#endif
MYSQL_ROW row;
uint64_t segmentNo = 0;
Json::Value tmp;
Json::Value resultJson;
Json::FastWriter fastWriter;
+
+ uint64_t viewStart = 0, viewEnd = 0;
while ((row = mysql_fetch_row(results.get())))
{
tmp.append(row[0]);
const std::string tmpString = fastWriter.write(tmp);
if (tmpString.length() > PAYLOAD_LIMIT) {
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
+ autocomplete, resultCount, viewStart, viewEnd);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
tmp.clear();
resultJson.clear();
- segmentNo ++;
+ segmentNo++;
+ viewStart = viewEnd + 1;
}
resultJson.append(row[0]);
+ viewEnd++;
}
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
+ autocomplete, resultCount, viewStart, viewEnd);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
@@ -846,12 +1079,23 @@
uint64_t segmentNo,
bool isFinalBlock,
bool isAutocomplete,
- uint64_t resultCount)
+ uint64_t resultCount,
+ uint64_t viewStart,
+ uint64_t viewEnd)
{
Json::Value entry;
Json::FastWriter fastWriter;
- Json::UInt64 count(resultCount);
- entry["resultCount"] = count;
+
+ entry["resultCount"] = Json::UInt64(resultCount);;
+ entry["viewStart"] = Json::UInt64(viewStart);
+ entry["viewEnd"] = Json::UInt64(viewEnd);
+
+#ifndef NDEBUG
+ std::cout << "resultCount " << resultCount
+ << "; viewStart " << viewStart
+ << "; viewEnd " << viewEnd << std::endl;
+#endif
+
if (isAutocomplete) {
entry["next"] = value;
} else {