catalog: support autocompletion query
* use seaprate functions to for autocompletion query and component-based query
* add json-paramters as a name component in the query-results content
* set freshness for ACK data
* fix bug that catalog throws run-time error when malformed Json::value is used
* query-results data conforms the schema
Change-Id: I942c26888fa79c206f2fa27f6f0a92fc4a8bef79
refs: #3024, #2797, #3035, #3047
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
index ebec2f9..7b6c0bb 100644
--- a/catalog/src/catalog/catalog.cpp
+++ b/catalog/src/catalog/catalog.cpp
@@ -56,6 +56,17 @@
" in \"general\" section");
}
}
+ if (i->first == "nameFields") {
+ std::istringstream ss(i->second.get_value<std::string>());
+ std::string token;
+ while(std::getline(ss, token, ',')) {
+ m_nameFields.push_back(token);
+ }
+ }
+ }
+ if (m_nameFields.size() == 0) { // nameFields must not be empty
+ throw Error("Empty value for \"nameFields\""
+ " in \"general\" section");
}
}
@@ -84,7 +95,7 @@
i != m_adapters.end();
++ i)
{
- (*i)->setConfigFile(config, m_prefix);
+ (*i)->setConfigFile(config, m_prefix, m_nameFields);
}
config.parse(m_configFile, true);
diff --git a/catalog/src/catalog/catalog.hpp b/catalog/src/catalog/catalog.hpp
index b58129e..7041e82 100644
--- a/catalog/src/catalog/catalog.hpp
+++ b/catalog/src/catalog/catalog.hpp
@@ -108,6 +108,7 @@
// Adapters that added by users
std::vector<std::unique_ptr<util::CatalogAdapter>> m_adapters;
+ std::vector<std::string> m_nameFields;
}; // class Catalog
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index 492bcff..9fb03db 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -65,7 +65,8 @@
*/
void
setConfigFile(util::ConfigFile& config,
- const ndn::Name& prefix);
+ const ndn::Name& prefix,
+ const std::vector<std::string>& nameFields);
protected:
/**
@@ -163,8 +164,10 @@
template <typename DatabaseHandler>
void
PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
- const ndn::Name& prefix)
+ const ndn::Name& prefix,
+ const std::vector<std::string>& nameFields)
{
+ m_nameFields = nameFields;
config.addSectionHandler("publishAdapter",
bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index 107e6c4..2adb161 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -47,10 +47,12 @@
#include <mutex>
#include <sstream>
#include <string>
+#include <array>
namespace atmos {
namespace query {
-static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
+// todo: calculate payload limit by get the size of a signed empty Data packet
+static const size_t PAYLOAD_LIMIT = 7000;
/**
* QueryAdapter handles the Query usecases for the catalog
@@ -75,7 +77,8 @@
*/
void
setConfigFile(util::ConfigFile& config,
- const ndn::Name& prefix);
+ const ndn::Name& prefix,
+ const std::vector<std::string>& nameFields);
protected:
/**
@@ -115,13 +118,15 @@
* @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
* last entry
* @param isAutocomplete: bool to indicate whether this is an autocomplete message
+ * @param resultCount: the number of records in the query results
*/
std::shared_ptr<ndn::Data>
makeReplyData(const ndn::Name& segmentPrefix,
const Json::Value& value,
uint64_t segmentNo,
bool isFinalBlock,
- bool isAutocomplete);
+ bool isAutocomplete,
+ uint64_t resultCount);
/**
* Helper function that generates query results from a Json query carried in the Interest
@@ -142,15 +147,21 @@
const ndn::Name::Component& version);
/**
- * Helper function that generates the sqlQuery string and autocomplete flag
- * @param sqlQuery: stringstream to save the sqlQuery string
- * @param jsonValue: Json value that contains the query information
- * @param autocomplete: Flag to indicate if the json contains autocomplete flag
+ * Helper function that sends NACK
+ *
+ * @param dataPrefix: prefix for the data packet
*/
void
+ sendNack(const ndn::Name& dataPrefix);
+
+ /**
+ * Helper function that generates the sqlQuery string for component-based query
+ * @param sqlQuery: stringstream to save the sqlQuery string
+ * @param jsonValue: Json value that contains the query information
+ */
+ bool
json2Sql(std::stringstream& sqlQuery,
- Json::Value& jsonValue,
- bool& autocomplete);
+ Json::Value& jsonValue);
/**
* Helper function that signs the data
@@ -178,6 +189,15 @@
void
setFilters();
+ /**
+ * Helper function that generates the sqlQuery string for autocomplete query
+ * @param sqlQuery: stringstream to save the sqlQuery string
+ * @param jsonValue: Json value that contains the query information
+ */
+ bool
+ json2AutocompletionSql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue);
+
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
// Handle to the Catalog's database
@@ -192,6 +212,8 @@
ndn::util::InMemoryStorageLru m_cache;
// @}
RegisteredPrefixList m_registeredPrefixList;
+ //std::vector<std::string> m_atmosColumns;
+ ndn::Name m_catalogId; // should be replaced with the PK digest
};
template <typename DatabaseHandler>
@@ -199,6 +221,7 @@
const std::shared_ptr<ndn::KeyChain>& keyChain)
: util::CatalogAdapter(face, keyChain)
, m_cache(250000)
+ , m_catalogId("catalogIdPlaceHolder")
{
}
@@ -228,8 +251,10 @@
template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
- const ndn::Name& prefix)
+ const ndn::Name& prefix,
+ const std::vector<std::string>& nameFields)
{
+ m_nameFields = nameFields;
config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
}
@@ -340,9 +365,9 @@
return;
}
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
- #ifndef NDEBUG
- std::cout << "query interest : " << interestPtr->getName() << std::endl;
- #endif
+
+ std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
+
// @todo: use thread pool
std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
this,
@@ -359,9 +384,9 @@
// CS so we just ignore any retrieval Interests that hit us for
// now. In the future, this should check some form of
// InMemoryStorage.
- #ifndef NDEBUG
- std::cout << "query results interest : " << interest.toUri() << std::endl;
- #endif
+
+ std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
+
auto data = m_cache.find(interest.getName());
if (data) {
m_face->put(*data);
@@ -389,23 +414,53 @@
// JSON parsed ok, so we can acknowledge successful receipt of the query
ndn::Name ackName(interest->getName());
ackName.append(version);
+ ackName.append(m_catalogId);
ackName.append("OK");
std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+ ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
+
signData(*ack);
- #ifndef NDEBUG
- std::cout << "makeAckData : " << ackName << std::endl;
- #endif
+#ifndef NDEBUG
+ std::cout << "makeAckData : " << ackName << std::endl;
+#endif
return ack;
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
- Json::Value& jsonValue,
- bool& autocomplete)
+QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
{
- // 3) Convert the JSON Query into a MySQL one
+ uint64_t segmentNo = 0;
+
+ std::shared_ptr<ndn::Data> nack =
+ std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
+ nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
+ nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
+
+ signData(*nack);
+
+ std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
+
+ m_mutex.lock();
+ m_cache.insert(*nack);
+ m_mutex.unlock();
+}
+
+
+template <typename DatabaseHandler>
+bool
+QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue)
+{
+#ifndef NDEBUG
+ std::cout << "jsonValue in json2Sql: " << jsonValue.toStyledString() << std::endl;
+#endif
+ if (jsonValue.type() != Json::objectValue) {
+ std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+ return false;
+ }
+
sqlQuery << "SELECT name FROM cmip5";
bool input = false;
for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
@@ -413,28 +468,115 @@
Json::Value key = iter.key();
Json::Value value = (*iter);
+ if (key == Json::nullValue || value == Json::nullValue) {
+ std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ // cannot convert to string
+ if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
+ std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ if (key.asString().compare("?") == 0) {
+ continue;
+ }
+
if (input) {
sqlQuery << " AND";
} else {
sqlQuery << " WHERE";
}
- // Auto-complete case
- if (key.asString().compare("?") == 0) {
- sqlQuery << " name REGEXP '^" << value.asString() << "'";
- autocomplete = true;
- }
- // Component case
- else {
- sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
- }
+ sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
input = true;
}
if (!input) { // Force it to be the empty set
- sqlQuery << " limit 0";
+ return false;
}
sqlQuery << ";";
+ return true;
+}
+
+template <typename DatabaseHandler>
+bool
+QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue)
+{
+#ifndef NDEBUG
+ std::cout << "jsonValue in json2AutocompletionSql: " << jsonValue.toStyledString() << std::endl;
+#endif
+ if (jsonValue.type() != Json::objectValue) {
+ std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+ return false;
+ }
+
+ std::string typedString;
+ // get the string in the jsonValue
+ for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+ {
+ Json::Value key = iter.key();
+ Json::Value value = (*iter);
+
+ if (key == Json::nullValue || value == Json::nullValue) {
+ std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ // cannot convert to string
+ if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
+ std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ if (key.asString().compare("?") == 0) {
+ typedString.assign(value.asString());
+ // since the front end triggers the autocompletion when users typed '/',
+ // there must be a '/' at the end, and the first char must be '/'
+ if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+ return false;
+ break;
+ }
+ }
+
+ // 1. get the expected column number by parsing the typedString, so we can get the filed name
+ size_t pos = 0;
+ size_t start = 1; // start from the 1st char which is not '/'
+ size_t count = 0; // also the name to query for
+ std::string token;
+ std::string delimiter = "/";
+ std::map<std::string, std::string> typedComponents;
+ while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
+ token = typedString.substr(start, pos - start);
+ if (count >= m_nameFields.size() - 1) {
+ return false;
+ }
+
+ // add column name and value (token) into map
+ typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
+ count ++;
+ start = pos + 1;
+ }
+
+ // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
+ // return true
+ bool more = false;
+ sqlQuery << "SELECT " << m_nameFields[count] << " FROM cmip5";
+ for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
+ it != typedComponents.end(); ++it) {
+ if (more)
+ sqlQuery << " AND";
+ else
+ sqlQuery << " WHERE";
+
+ sqlQuery << " " << it->first << "='" << it->second << "'";
+
+ more = true;
+ }
+ sqlQuery << ";";
+ return true;
}
template <typename DatabaseHandler>
@@ -449,10 +591,11 @@
const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
if (jsonQuery.length() <= 0) {
- // send Nack?
+ // no JSON query, send Nack?
return;
}
- // ------------------
+ // check if the ACK is cached, if yes, respond with ACK
+ // ?? what if the results for now it NULL, but latter exist?
// For efficiency, do a double check. Once without the lock, then with it.
if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
m_mutex.lock();
@@ -478,6 +621,7 @@
return;
}
+ // the version should be replaced with ChronoSync state digest
const ndn::name::Component version
= ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
ndn::time::system_clock::now()).count());
@@ -494,6 +638,8 @@
return;
}
// This is where things are expensive so we save them for the lock
+ // note that we ack the query with the cached ACK messages, but we should remove the ACKs
+ // that conatin the old version when ChronoSync is updated
m_activeQueryToFirstResponse.insert(std::pair<std::string,
std::shared_ptr<ndn::Data>>(jsonQuery, ack));
m_face->put(*ack);
@@ -503,13 +649,33 @@
// 3) Convert the JSON Query into a MySQL one
bool autocomplete = false;
std::stringstream sqlQuery;
- json2Sql(sqlQuery, parsedFromString, autocomplete);
- // 4) Run the Query
+ // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
+ // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
ndn::Name segmentPrefix(m_prefix);
segmentPrefix.append("query-results");
+ segmentPrefix.append(jsonStr);
segmentPrefix.append(version);
+ segmentPrefix.append(m_catalogId);
+ Json::Value tmp;
+ // expect the autocomplete and the component-based query are separate
+ // if JSON::Value contains ? as key, is autocompletion
+ if (parsedFromString.get("?", tmp) != tmp) {
+ autocomplete = true;
+ if (!json2AutocompletionSql(sqlQuery, parsedFromString)) {
+ sendNack(segmentPrefix);
+ return;
+ }
+ }
+ else {
+ if (!json2Sql(sqlQuery, parsedFromString)) {
+ sendNack(segmentPrefix);
+ return;
+ }
+ }
+
+ // 4) Run the Query
prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
}
@@ -529,32 +695,29 @@
const std::string& sqlString,
bool autocomplete)
{
-#ifndef NDEBUG
- std::cout << "sqlString in prepareSegments : " << sqlString << std::endl;
-#endif
+ std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
+
// 4) Run the Query
std::shared_ptr<MYSQL_RES> results
= atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
if (!results) {
-#ifndef NDEBUG
std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
-#endif
+
// @todo: throw runtime error or log the error message?
return;
}
-#ifndef NDEBUG
+ uint64_t resultCount = mysql_num_rows(results.get());
+
std::cout << "Query results for \""
<< sqlString
<< "\" contain "
- << mysql_num_rows(results.get())
+ << resultCount
<< " rows" << std::endl;
-#endif
MYSQL_ROW row;
size_t usedBytes = 0;
- const size_t PAYLOAD_LIMIT = 7000;
uint64_t segmentNo = 0;
Json::Value array;
while ((row = mysql_fetch_row(results.get())))
@@ -562,7 +725,7 @@
size_t size = strlen(row[0]) + 1;
if (usedBytes + size > PAYLOAD_LIMIT) {
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete);
+ = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete, resultCount);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
@@ -574,7 +737,7 @@
usedBytes += size;
}
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete);
+ = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete, resultCount);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
@@ -586,10 +749,13 @@
const Json::Value& value,
uint64_t segmentNo,
bool isFinalBlock,
- bool isAutocomplete)
+ bool isAutocomplete,
+ uint64_t resultCount)
{
Json::Value entry;
Json::FastWriter fastWriter;
+ Json::UInt64 count(resultCount);
+ entry["resultCount"] = count;
if (isAutocomplete) {
entry["next"] = value;
} else {
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 2b03358..863f29a 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -72,7 +72,8 @@
*/
virtual void
setConfigFile(util::ConfigFile& config,
- const ndn::Name& prefix) = 0;
+ const ndn::Name& prefix,
+ const std::vector<std::string>& nameFields) = 0;
protected:
@@ -120,6 +121,7 @@
ndn::Name m_prefix;
// Name for the signing key
ndn::Name m_signingId;
+ std::vector<std::string> m_nameFields;
}; // class CatalogAdapter