populate filters menu
* add view index in payload
* fix bug that catalog crashes upon receiving request for empty prefix
* move component <catalogID> before <query-param>
refs: #3120
Change-Id: I468a72b6b2d1889b9c73052a0226dfeaa851179f
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
index 7b6c0bb..1d94dfc 100644
--- a/catalog/src/catalog/catalog.cpp
+++ b/catalog/src/catalog/catalog.cpp
@@ -46,7 +46,7 @@
}
for (auto i = configSection.begin();
i != configSection.end();
- ++ i)
+ ++i)
{
if (i->first == "prefix") {
m_prefix.clear();
@@ -63,11 +63,18 @@
m_nameFields.push_back(token);
}
}
+ if (i->first == "databaseTable") {
+ m_databaseTable = i->second.get_value<std::string>();
+ }
}
if (m_nameFields.size() == 0) { // nameFields must not be empty
throw Error("Empty value for \"nameFields\""
" in \"general\" section");
}
+ if (m_databaseTable.empty()) {
+ throw Error("Empty value for \"databaseTable\""
+ " in \"general\" section");
+ }
}
void
@@ -93,9 +100,9 @@
util::ConfigFile config(&util::ConfigFile::ignoreUnknownSection);
for (auto i = m_adapters.begin();
i != m_adapters.end();
- ++ i)
+ ++i)
{
- (*i)->setConfigFile(config, m_prefix, m_nameFields);
+ (*i)->setConfigFile(config, m_prefix, m_nameFields, m_databaseTable);
}
config.parse(m_configFile, true);
diff --git a/catalog/src/catalog/catalog.hpp b/catalog/src/catalog/catalog.hpp
index 7041e82..d2c839e 100644
--- a/catalog/src/catalog/catalog.hpp
+++ b/catalog/src/catalog/catalog.hpp
@@ -109,6 +109,7 @@
// Adapters that added by users
std::vector<std::unique_ptr<util::CatalogAdapter>> m_adapters;
std::vector<std::string> m_nameFields;
+ std::string m_databaseTable;
}; // class Catalog
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index b954bc8..1d6f91f 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -33,6 +33,7 @@
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/security/validator-config.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
#include <ChronoSync/socket.hpp>
#include <memory>
@@ -75,7 +76,8 @@
void
setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields);
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable);
protected:
/**
@@ -121,6 +123,9 @@
void
setFilters();
+ void
+ setCatalogId();
+
/**
* Function to validate publication changes against the trust model, which is, all file
* names must be under the publisher's prefix. This function should be called by a callback
@@ -137,16 +142,16 @@
*
* @param updates: vector that contains all the missing data information
*/
- void
- processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
+ void
+ processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
/**
* Helper function that processes the update data
*
* @param data: shared pointer for the fetched update data
*/
- void
- processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
+ void
+ processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
/**
* Helper function that add data to or remove data from database
@@ -154,9 +159,9 @@
* @param sql: sql string to do the add or remove jobs
* @param op: enum value indicates the database operation, could be REMOVE, ADD
*/
- virtual void
- operateDatabase(const std::string& sql,
- util::DatabaseOperation op);
+ virtual void
+ operateDatabase(const std::string& sql,
+ util::DatabaseOperation op);
/**
* Helper function that parses jsonValue to generate sql string, return value indicates
@@ -166,10 +171,10 @@
* @param jsonValue: Json value that contains the update information
* @param op: enum value indicates the database operation, could be REMOVE, ADD
*/
- bool
- json2Sql(std::stringstream& sqlString,
- Json::Value& jsonValue,
- util::DatabaseOperation op);
+ bool
+ json2Sql(std::stringstream& sqlString,
+ Json::Value& jsonValue,
+ util::DatabaseOperation op);
/**
* Helper function to generate sql string based on file name, return value indicates
@@ -178,9 +183,9 @@
* @param sqlString: streamstream to save the sql string
* @param fileName: ndn uri string for a file name
*/
- bool
- name2Fields(std::stringstream& sqlstring,
- std::string& fileName);
+ bool
+ name2Fields(std::stringstream& sqlstring,
+ std::string& fileName);
/**
* Check the local database for the latest sequence number for a ChronoSync update
@@ -195,23 +200,23 @@
*
* @param update: the MissingDataInfo object
*/
- void
- renewUpdateInformation(const chronosync::MissingDataInfo& update);
+ void
+ renewUpdateInformation(const chronosync::MissingDataInfo& update);
/**
* Insert the update message into the local database
*
* @param update: the MissingDataInfo object
*/
- void
- addUpdateInformation(const chronosync::MissingDataInfo& update);
+ void
+ addUpdateInformation(const chronosync::MissingDataInfo& update);
- void
- onFetchUpdateDataTimeout(const ndn::Interest& interest);
+ void
+ onFetchUpdateDataTimeout(const ndn::Interest& interest);
- void
- onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
- const std::string& failureInfo);
+ void
+ onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+ const std::string& failureInfo);
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
@@ -228,6 +233,7 @@
// TODO: create thread for each request, and the variables below should be within the thread
bool m_mustBeFresh;
bool m_isFinished;
+ ndn::Name m_catalogId;
};
@@ -237,11 +243,37 @@
: util::CatalogAdapter(face, keyChain)
, m_mustBeFresh(true)
, m_isFinished(false)
+ , m_catalogId("catalogIdPlaceHolder")
{
}
template <typename DatabaseHandler>
void
+PublishAdapter<DatabaseHandler>::setCatalogId()
+{
+ // empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::setCatalogId()
+{
+ // use public key digest as the catalog ID
+ ndn::Name keyId;
+ if (m_signingId.empty()) {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
+ } else {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+ }
+
+ std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
+ ndn::Block keyDigest = pKey->computeDigest();
+ m_catalogId.clear();
+ m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
+}
+
+template <typename DatabaseHandler>
+void
PublishAdapter<DatabaseHandler>::setFilters()
{
ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
@@ -254,7 +286,7 @@
bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
- ndn::Name catalogSync = ndn::Name(m_prefix).append("sync");
+ ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
m_socket.reset(new chronosync::Socket(m_syncPrefix,
catalogSync,
*m_face,
@@ -275,9 +307,11 @@
void
PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields)
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable)
{
m_nameFields = nameFields;
+ m_databaseTable = databaseTable;
config.addSectionHandler("publishAdapter",
bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
@@ -300,10 +334,10 @@
for (auto item = section.begin();
item != section.end();
- ++ item)
+ ++item)
{
if (item->first == "signingId") {
- signingId.assign(item->second.get_value<std::string>());
+ signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Invalid value for \"signingId\""
" in \"publish\" section");
@@ -319,30 +353,30 @@
const util::ConfigSection& databaseSection = item->second;
for (auto subItem = databaseSection.begin();
subItem != databaseSection.end();
- ++ subItem) {
+ ++subItem) {
if (subItem->first == "dbServer") {
- dbServer.assign(subItem->second.get_value<std::string>());
+ dbServer = subItem->second.get_value<std::string>();
if (dbServer.empty()){
throw Error("Invalid value for \"dbServer\""
" in \"publish\" section");
}
}
if (subItem->first == "dbName") {
- dbName.assign(subItem->second.get_value<std::string>());
+ dbName = subItem->second.get_value<std::string>();
if (dbName.empty()){
throw Error("Invalid value for \"dbName\""
" in \"publish\" section");
}
}
if (subItem->first == "dbUser") {
- dbUser.assign(subItem->second.get_value<std::string>());
+ dbUser = subItem->second.get_value<std::string>();
if (dbUser.empty()){
throw Error("Invalid value for \"dbUser\""
" in \"publish\" section");
}
}
if (subItem->first == "dbPasswd") {
- dbPasswd.assign(subItem->second.get_value<std::string>());
+ dbPasswd = subItem->second.get_value<std::string>();
if (dbPasswd.empty()){
throw Error("Invalid value for \"dbPasswd\""
" in \"publish\" section");
@@ -354,10 +388,10 @@
const util::ConfigSection& synSection = item->second;
for (auto subItem = synSection.begin();
subItem != synSection.end();
- ++ subItem) {
+ ++subItem) {
if (subItem->first == "prefix") {
syncPrefix.clear();
- syncPrefix.assign(subItem->second.get_value<std::string>());
+ syncPrefix = subItem->second.get_value<std::string>();
if (syncPrefix.empty()){
throw Error("Invalid value for \"prefix\""
" in \"publish\\sync\" section");
@@ -370,6 +404,8 @@
m_prefix = prefix;
m_signingId = ndn::Name(signingId);
+ setCatalogId();
+
m_syncPrefix.clear();
m_syncPrefix.append(syncPrefix);
util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
@@ -403,34 +439,28 @@
`session_name` varchar(1000) NOT NULL, \
`seq_num` int(11) NOT NULL, \
PRIMARY KEY (`id`), \
- UNIQUE KEY `id_UNIQUE` (`id`) \
+ UNIQUE KEY `id_UNIQUE` (`id`) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
- MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE, success, errMsg);
+ MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
+ success, errMsg);
if (!success)
std::cout << errMsg << std::endl;
- std::string createCmip5Table =
- "CREATE TABLE `cmip5` ( \
- `id` int(100) NOT NULL AUTO_INCREMENT, \
- `sha256` varchar(64) NOT NULL, \
- `name` varchar(1000) NOT NULL, \
- `activity` varchar(100) NOT NULL, \
- `product` varchar(100) NOT NULL, \
- `organization` varchar(100) NOT NULL, \
- `model` varchar(100) NOT NULL, \
- `experiment` varchar(100) NOT NULL, \
- `frequency` varchar(100) NOT NULL, \
- `modeling_realm` varchar(100) NOT NULL, \
- `variable_name` varchar(100) NOT NULL, \
- `ensemble` varchar(100) NOT NULL, \
- `time` varchar(100) NOT NULL, \
- PRIMARY KEY (`id`), \
- UNIQUE KEY `sha256` (`sha256`) \
+ // create SQL string for table creation, id, sha256, and name are columns that we need
+ std::stringstream ss;
+ ss << "CREATE TABLE `" << m_databaseTable << "` (\
+ `id` int(100) NOT NULL AUTO_INCREMENT, \
+ `sha256` varchar(64) NOT NULL, \
+ `name` varchar(1000) NOT NULL,";
+ for (size_t i = 0; i < m_nameFields.size(); i++) {
+ ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
+ }
+ ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
success = false;
- MySQLPerformQuery(m_databaseHandler, createCmip5Table, util::CREATE, success, errMsg);
+ MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
if (!success)
std::cout << errMsg << std::endl;
}
@@ -582,9 +612,11 @@
chronosync::SeqNo
PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
{
- std::string sql = "select seq_num from chronosync_update_info where session_name = '"
+ std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
+ update.session.toUri() + "';";
+#ifndef NDEBUG
std::cout << "get latest seqNo : " << sql << std::endl;
+#endif
std::string errMsg;
bool success;
std::shared_ptr<MYSQL_RES> results
@@ -618,9 +650,9 @@
void
PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
{
- std::string sql = "update chronosync_update_info set seq_num = "
+ std::string sql = "UPDATE chronosync_update_info SET seq_num = "
+ boost::lexical_cast<std::string>(update.high)
- + " where session_name = '" + update.session.toUri() + "';";
+ + " WHERE session_name = '" + update.session.toUri() + "';";
std::cout << "renew update Info : " << sql << std::endl;
std::string errMsg;
bool success = false;
@@ -665,7 +697,8 @@
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const
+ std::shared_ptr<const ndn::Data>& data,
const std::string& failureInfo)
{
std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
@@ -681,14 +714,14 @@
}
// multiple updates from different catalog are possible
- for (size_t i = 0; i < updates.size(); ++ i) {
+ for (size_t i = 0; i < updates.size(); ++i) {
// check if the session is in local DB
// if yes, only fetch packets whose seq number is bigger than the one in the DB
// if no, directly fetch Data
chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
bool update = false;
- for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++ seq) {
+ for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
if (seq > localSeqNo) {
m_socket->fetchData(updates[i].session, seq,
bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
@@ -747,15 +780,15 @@
if (updateNumber <= 0)
return false;
- sqlString << "INSERT INTO cmip5 (";
- for (size_t i = 0; i < atmosTableColumns.size(); ++ i) {
+ sqlString << "INSERT INTO " << m_databaseTable << " (";
+ for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
if (i != 0)
sqlString << ", ";
sqlString << atmosTableColumns[i];
}
sqlString << ") VALUES";
- for (size_t i = 0; i < updateNumber; ++ i) { //parse each file name
+ for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
if (i > 0)
sqlString << ",";
// cast might be overflowed
@@ -784,8 +817,8 @@
if (updateNumber <= 0)
return false;
- sqlString << "delete from cmip5 where name in (";
- for (size_t i = 0; i < updateNumber; ++ i) {
+ sqlString << "delete from " << m_databaseTable << " where name in (";
+ for (size_t i = 0; i < updateNumber; ++i) {
if (i > 0)
sqlString << ",";
// cast might be overflowed
@@ -826,7 +859,7 @@
return false;
while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
- count ++;
+ count++;
token = fileName.substr(start, pos - start);
if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
return false; //fileName contains more than 10 fields
@@ -845,7 +878,8 @@
template<typename DatabaseHandler>
bool
-PublishAdapter<DatabaseHandler>::validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data)
+PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
+ std::shared_ptr<const ndn::Data>& data)
{
// The data name must be "/<publisher-prefix>/<nonce>"
// the prefix is the data name removes the last component
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ccfefb6..c6391ef 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -38,6 +38,7 @@
#include <ndn-cxx/util/time.hpp>
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/util/in-memory-storage-lru.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
#include "mysql/mysql.h"
@@ -78,7 +79,8 @@
void
setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields);
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable);
protected:
/**
@@ -110,6 +112,26 @@
onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
+ * Handles requests for responses to an filter initialization request
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+ /**
+ * Helper function that generates query results from a Json query carried in the Interest
+ *
+ * @param interest: Interest that needs to be handled
+ */
+ void
+ populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
+
+ void
+ getFiltersMenu(Json::Value& value);
+
+ /**
* Helper function that makes query-results data
*
* @param segmentPrefix: Name that identifies the Prefix for the Data
@@ -119,6 +141,8 @@
* last entry
* @param isAutocomplete: bool to indicate whether this is an autocomplete message
* @param resultCount: the number of records in the query results
+ * @param viewStart: the start index of the record in the query results payload
+ * @param viewEnd: the end index of the record in the query results payload
*/
std::shared_ptr<ndn::Data>
makeReplyData(const ndn::Name& segmentPrefix,
@@ -126,7 +150,9 @@
uint64_t segmentNo,
bool isFinalBlock,
bool isAutocomplete,
- uint64_t resultCount);
+ uint64_t resultCount,
+ uint64_t viewStart,
+ uint64_t viewEnd);
/**
* Helper function that generates query results from a Json query carried in the Interest
@@ -189,6 +215,9 @@
void
setFilters();
+ void
+ setCatalogId();
+
/**
* Helper function that generates the sqlQuery string for autocomplete query
* @param sqlQuery: stringstream to save the sqlQuery string
@@ -200,7 +229,11 @@
bool
json2CompleteSearchSql(std::stringstream& sqlQuery,
- Json::Value& jsonValue);
+ Json::Value& jsonValue);
+
+ ndn::Name
+ getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version);
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
@@ -218,6 +251,7 @@
RegisteredPrefixList m_registeredPrefixList;
//std::vector<std::string> m_atmosColumns;
ndn::Name m_catalogId; // should be replaced with the PK digest
+ std::vector<std::string> m_filterCategoryNames;
};
template <typename DatabaseHandler>
@@ -225,7 +259,7 @@
const std::shared_ptr<ndn::KeyChain>& keyChain)
: util::CatalogAdapter(face, keyChain)
, m_cache(250000)
- , m_catalogId("catalogIdPlaceHolder")
+ , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
{
}
@@ -242,23 +276,37 @@
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
- ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
- m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
+ ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
+ m_registeredPrefixList[queryResultsPrefix] =
+ m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
+ .append("query-results").append(m_catalogId)),
bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
this, _1, _2),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
this, _1),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
+
+ ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
+ m_registeredPrefixList[filtersInitializationPrefix] =
+ m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
+ bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ this, _1, _2),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
}
template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields)
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable)
{
m_nameFields = nameFields;
+ m_databaseTable = databaseTable;
config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
}
@@ -277,44 +325,51 @@
std::string signingId, dbServer, dbName, dbUser, dbPasswd;
for (auto item = section.begin();
item != section.end();
- ++ item)
+ ++item)
{
if (item->first == "signingId") {
- signingId.assign(item->second.get_value<std::string>());
+ signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Empty value for \"signingId\""
" in \"query\" section");
}
}
+ if (item->first == "filterCategoryNames") {
+ std::istringstream ss(item->second.get_value<std::string>());
+ std::string token;
+ while(std::getline(ss, token, ',')) {
+ m_filterCategoryNames.push_back(token);
+ }
+ }
if (item->first == "database") {
const util::ConfigSection& dataSection = item->second;
for (auto subItem = dataSection.begin();
subItem != dataSection.end();
- ++ subItem)
+ ++subItem)
{
if (subItem->first == "dbServer") {
- dbServer.assign(subItem->second.get_value<std::string>());
+ dbServer = subItem->second.get_value<std::string>();
if (dbServer.empty()){
throw Error("Invalid value for \"dbServer\""
" in \"query\" section");
}
}
if (subItem->first == "dbName") {
- dbName.assign(subItem->second.get_value<std::string>());
+ dbName = subItem->second.get_value<std::string>();
if (dbName.empty()){
throw Error("Invalid value for \"dbName\""
" in \"query\" section");
}
}
if (subItem->first == "dbUser") {
- dbUser.assign(subItem->second.get_value<std::string>());
+ dbUser = subItem->second.get_value<std::string>();
if (dbUser.empty()){
throw Error("Invalid value for \"dbUser\""
" in \"query\" section");
}
}
if (subItem->first == "dbPasswd") {
- dbPasswd.assign(subItem->second.get_value<std::string>());
+ dbPasswd = subItem->second.get_value<std::string>();
if (dbPasswd.empty()){
throw Error("Invalid value for \"dbPasswd\""
" in \"query\" section");
@@ -324,16 +379,47 @@
}
}
- m_prefix = prefix;
- m_signingId = ndn::Name(signingId);
- util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+ if (m_filterCategoryNames.size() == 0) {
+ throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
+ }
+ m_prefix = prefix;
+
+ m_signingId = ndn::Name(signingId);
+ setCatalogId();
+
+ util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
setDatabaseHandler(mysqlId);
setFilters();
}
template <typename DatabaseHandler>
void
+QueryAdapter<DatabaseHandler>::setCatalogId()
+{
+ //empty
+}
+
+template <>
+void
+QueryAdapter<MYSQL>::setCatalogId()
+{
+ // use public key digest as the catalog ID
+ ndn::Name keyId;
+ if (m_signingId.empty()) {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
+ } else {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+ }
+
+ std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
+ ndn::Block keyDigest = pKey->computeDigest();
+ m_catalogId.clear();
+ m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
+}
+
+template <typename DatabaseHandler>
+void
QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
{
//empty
@@ -370,7 +456,9 @@
}
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+#ifndef NDEBUG
std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
+#endif
// @todo: use thread pool
std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
@@ -389,7 +477,9 @@
// now. In the future, this should check some form of
// InMemoryStorage.
+#ifndef NDEBUG
std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
+#endif
auto data = m_cache.find(interest.getName());
if (data) {
@@ -399,6 +489,132 @@
template <typename DatabaseHandler>
void
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
+{
+ std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+
+#ifndef NDEBUG
+ std::cout << "incoming initialization interest : " << interestPtr->getName() << std::endl;
+#endif
+ // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
+ // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
+
+ auto data = m_cache.find(interest.getName());
+ if (data) {
+ m_face->put(*data);
+ }
+ else {
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
+ this,
+ interestPtr);
+ queryThread.join();
+ }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
+{
+ Json::Value filters;
+ Json::FastWriter fastWriter;
+ getFiltersMenu(filters);
+
+ const std::string filterValue = fastWriter.write(filters);
+
+ if (!filters.empty()) {
+ ndn::Name filterDataName(interest->getName());
+ filterDataName.append("stateVersion");// TODO: should replace with a state version
+
+ const char* payload = filterValue.c_str();
+ size_t payloadLength = filterValue.size();
+ size_t startIndex = 0, seqNo = 0;
+
+ if (filterValue.length() > PAYLOAD_LIMIT) {
+ payloadLength = PAYLOAD_LIMIT;
+ ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
+ std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
+ filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+ filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+
+ signData(*filterData);
+#ifndef NDEBUG
+ std::cout << "populate filter Data : " << segmentName << std::endl;
+#endif
+ m_mutex.lock();
+ m_cache.insert(*filterData);
+ try {
+ m_face->put(*filterData);
+ }// catch exceptions and log
+ catch (std::exception& e) {
+ std::cout << e.what() << std::endl;
+ }
+ m_mutex.unlock();
+
+ seqNo++;
+ startIndex = payloadLength * seqNo + 1;
+ }
+ payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
+
+ ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
+ std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
+ filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+ filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+ filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
+
+ signData(*filterData);
+ m_mutex.lock();
+ m_cache.insert(*filterData);
+ m_face->put(*filterData);
+ m_mutex.unlock();
+ }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
+{
+ // empty
+}
+
+// get distinct value of each column
+template <>
+void
+QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
+{
+ Json::Value tmp;
+
+ for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
+ std::string columnName = m_filterCategoryNames[i];
+ std::string getFilterSql("SELECT DISTINCT " + columnName +
+ " FROM " + m_databaseTable + ";");
+ std::string errMsg;
+ bool success;
+
+ std::shared_ptr<MYSQL_RES> results
+ = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
+ util::QUERY, success, errMsg);
+ if (!success) {
+ std::cout << errMsg << std::endl;
+ value.clear();
+ return;
+ }
+
+ while (MYSQL_ROW row = mysql_fetch_row(results.get()))
+ {
+ tmp[columnName].append(row[0]);
+ }
+ value.append(tmp);
+ tmp.clear();
+ }
+
+#ifndef NDEBUG
+ std::cout << value.toStyledString() << std::endl;
+#endif
+}
+
+template <typename DatabaseHandler>
+void
QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
{
if (m_signingId.empty())
@@ -411,22 +627,36 @@
}
template <typename DatabaseHandler>
+ndn::Name
+QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version)
+{
+ // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
+ // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
+
+ ndn::Name queryResultName(m_prefix);
+ queryResultName.append("query-results")
+ .append(m_catalogId)
+ .append(interest->getName().get(-1))
+ .append(version);
+ return queryResultName;
+}
+
+template <typename DatabaseHandler>
std::shared_ptr<ndn::Data>
QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
const ndn::Name::Component& version)
{
- // JSON parsed ok, so we can acknowledge successful receipt of the query
- ndn::Name ackName(interest->getName());
- ackName.append(version);
- ackName.append(m_catalogId);
- ackName.append("OK");
+ std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
- std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+ std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
+ ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
+ queryResultNameStr.length());
ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
signData(*ack);
#ifndef NDEBUG
- std::cout << "makeAckData : " << ackName << std::endl;
+ std::cout << "qurey-results data name in ACK : " << queryResultNameStr << std::endl;
#endif
return ack;
}
@@ -465,7 +695,7 @@
return false;
}
- sqlQuery << "SELECT name FROM cmip5";
+ sqlQuery << "SELECT name FROM " << m_databaseTable;
bool input = false;
for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
{
@@ -536,7 +766,7 @@
}
if (key.asString().compare("?") == 0) {
- typedString.assign(value.asString());
+ typedString = value.asString();
// since the front end triggers the autocompletion when users typed '/',
// there must be a '/' at the end, and the first char must be '/'
if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
@@ -560,14 +790,14 @@
// add column name and value (token) into map
typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
- count ++;
+ count++;
start = pos + 1;
}
// 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
// return true
bool more = false;
- sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
+ sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
it != typedComponents.end(); ++it) {
if (more)
@@ -615,10 +845,11 @@
}
if (key.asString().compare("??") == 0) {
- typedString.assign(value.asString());
+ typedString = value.asString();
// since the front end triggers the autocompletion when users typed '/',
// there must be a '/' at the end, and the first char must be '/'
- if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+ if (typedString.empty() || typedString.at(typedString.length() - 1) != '/' ||
+ typedString.find("/") != 0)
return false;
break;
}
@@ -639,14 +870,14 @@
// add column name and value (token) into map
typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
- count ++;
+ count++;
start = pos + 1;
}
// 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
// return true
bool more = false;
- sqlQuery << "SELECT name FROM cmip5";
+ sqlQuery << "SELECT name FROM " << m_databaseTable;
for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
it != typedComponents.end(); ++it) {
if (more)
@@ -723,8 +954,8 @@
// This is where things are expensive so we save them for the lock
// note that we ack the query with the cached ACK messages, but we should remove the ACKs
// that conatin the old version when ChronoSync is updated
- m_activeQueryToFirstResponse.insert(std::pair<std::string,
- std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+ //m_activeQueryToFirstResponse.insert(std::pair<std::string,
+ // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
m_face->put(*ack);
} // !!! END CRITICAL SECTION !!!
m_mutex.unlock();
@@ -733,13 +964,7 @@
bool autocomplete = false;
std::stringstream sqlQuery;
- // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
- // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
- ndn::Name segmentPrefix(m_prefix);
- segmentPrefix.append("query-results");
- segmentPrefix.append(jsonStr);
- segmentPrefix.append(version);
- segmentPrefix.append(m_catalogId);
+ ndn::Name segmentPrefix(getQueryResultsName(interest, version));
Json::Value tmp;
// expect the autocomplete and the component-based query are separate
@@ -804,36 +1029,44 @@
uint64_t resultCount = mysql_num_rows(results.get());
+#ifndef NDEBUG
std::cout << "Query results for \""
<< sqlString
<< "\" contain "
<< resultCount
<< " rows" << std::endl;
+#endif
MYSQL_ROW row;
uint64_t segmentNo = 0;
Json::Value tmp;
Json::Value resultJson;
Json::FastWriter fastWriter;
+
+ uint64_t viewStart = 0, viewEnd = 0;
while ((row = mysql_fetch_row(results.get())))
{
tmp.append(row[0]);
const std::string tmpString = fastWriter.write(tmp);
if (tmpString.length() > PAYLOAD_LIMIT) {
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
+ autocomplete, resultCount, viewStart, viewEnd);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
tmp.clear();
resultJson.clear();
- segmentNo ++;
+ segmentNo++;
+ viewStart = viewEnd + 1;
}
resultJson.append(row[0]);
+ viewEnd++;
}
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
+ autocomplete, resultCount, viewStart, viewEnd);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
@@ -846,12 +1079,23 @@
uint64_t segmentNo,
bool isFinalBlock,
bool isAutocomplete,
- uint64_t resultCount)
+ uint64_t resultCount,
+ uint64_t viewStart,
+ uint64_t viewEnd)
{
Json::Value entry;
Json::FastWriter fastWriter;
- Json::UInt64 count(resultCount);
- entry["resultCount"] = count;
+
+ entry["resultCount"] = Json::UInt64(resultCount);;
+ entry["viewStart"] = Json::UInt64(viewStart);
+ entry["viewEnd"] = Json::UInt64(viewEnd);
+
+#ifndef NDEBUG
+ std::cout << "resultCount " << resultCount
+ << "; viewStart " << viewStart
+ << "; viewEnd " << viewEnd << std::endl;
+#endif
+
if (isAutocomplete) {
entry["next"] = value;
} else {
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 863f29a..7c04c83 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -69,11 +69,14 @@
* Helper function that sets the configuration section handler
* @param config: ConfigFile object to set the handler
* @param prefix: Catalog prefix
+ * @param nameFields: string vector that contains filter category names
+ * @param databaseTable: table name in the database
*/
virtual void
setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields) = 0;
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable) = 0;
protected:
@@ -122,6 +125,7 @@
// Name for the signing key
ndn::Name m_signingId;
std::vector<std::string> m_nameFields;
+ std::string m_databaseTable;
}; // class CatalogAdapter