populate filters menu
* add view index in payload
* fix bug that catalog crashes upon receiving request for empty prefix
* move component <catalogID> before <query-param>
refs: #3120
Change-Id: I468a72b6b2d1889b9c73052a0226dfeaa851179f
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index b954bc8..1d6f91f 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -33,6 +33,7 @@
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/security/validator-config.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
#include <ChronoSync/socket.hpp>
#include <memory>
@@ -75,7 +76,8 @@
void
setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields);
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable);
protected:
/**
@@ -121,6 +123,9 @@
void
setFilters();
+ void
+ setCatalogId();
+
/**
* Function to validate publication changes against the trust model, which is, all file
* names must be under the publisher's prefix. This function should be called by a callback
@@ -137,16 +142,16 @@
*
* @param updates: vector that contains all the missing data information
*/
- void
- processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
+ void
+ processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
/**
* Helper function that processes the update data
*
* @param data: shared pointer for the fetched update data
*/
- void
- processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
+ void
+ processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
/**
* Helper function that add data to or remove data from database
@@ -154,9 +159,9 @@
* @param sql: sql string to do the add or remove jobs
* @param op: enum value indicates the database operation, could be REMOVE, ADD
*/
- virtual void
- operateDatabase(const std::string& sql,
- util::DatabaseOperation op);
+ virtual void
+ operateDatabase(const std::string& sql,
+ util::DatabaseOperation op);
/**
* Helper function that parses jsonValue to generate sql string, return value indicates
@@ -166,10 +171,10 @@
* @param jsonValue: Json value that contains the update information
* @param op: enum value indicates the database operation, could be REMOVE, ADD
*/
- bool
- json2Sql(std::stringstream& sqlString,
- Json::Value& jsonValue,
- util::DatabaseOperation op);
+ bool
+ json2Sql(std::stringstream& sqlString,
+ Json::Value& jsonValue,
+ util::DatabaseOperation op);
/**
* Helper function to generate sql string based on file name, return value indicates
@@ -178,9 +183,9 @@
* @param sqlString: streamstream to save the sql string
* @param fileName: ndn uri string for a file name
*/
- bool
- name2Fields(std::stringstream& sqlstring,
- std::string& fileName);
+ bool
+ name2Fields(std::stringstream& sqlstring,
+ std::string& fileName);
/**
* Check the local database for the latest sequence number for a ChronoSync update
@@ -195,23 +200,23 @@
*
* @param update: the MissingDataInfo object
*/
- void
- renewUpdateInformation(const chronosync::MissingDataInfo& update);
+ void
+ renewUpdateInformation(const chronosync::MissingDataInfo& update);
/**
* Insert the update message into the local database
*
* @param update: the MissingDataInfo object
*/
- void
- addUpdateInformation(const chronosync::MissingDataInfo& update);
+ void
+ addUpdateInformation(const chronosync::MissingDataInfo& update);
- void
- onFetchUpdateDataTimeout(const ndn::Interest& interest);
+ void
+ onFetchUpdateDataTimeout(const ndn::Interest& interest);
- void
- onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
- const std::string& failureInfo);
+ void
+ onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+ const std::string& failureInfo);
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
@@ -228,6 +233,7 @@
// TODO: create thread for each request, and the variables below should be within the thread
bool m_mustBeFresh;
bool m_isFinished;
+ ndn::Name m_catalogId;
};
@@ -237,11 +243,37 @@
: util::CatalogAdapter(face, keyChain)
, m_mustBeFresh(true)
, m_isFinished(false)
+ , m_catalogId("catalogIdPlaceHolder")
{
}
template <typename DatabaseHandler>
void
+PublishAdapter<DatabaseHandler>::setCatalogId()
+{
+ // empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::setCatalogId()
+{
+ // use public key digest as the catalog ID
+ ndn::Name keyId;
+ if (m_signingId.empty()) {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
+ } else {
+ keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+ }
+
+ std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
+ ndn::Block keyDigest = pKey->computeDigest();
+ m_catalogId.clear();
+ m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
+}
+
+template <typename DatabaseHandler>
+void
PublishAdapter<DatabaseHandler>::setFilters()
{
ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
@@ -254,7 +286,7 @@
bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
this, _1, _2));
- ndn::Name catalogSync = ndn::Name(m_prefix).append("sync");
+ ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
m_socket.reset(new chronosync::Socket(m_syncPrefix,
catalogSync,
*m_face,
@@ -275,9 +307,11 @@
void
PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
const ndn::Name& prefix,
- const std::vector<std::string>& nameFields)
+ const std::vector<std::string>& nameFields,
+ const std::string& databaseTable)
{
m_nameFields = nameFields;
+ m_databaseTable = databaseTable;
config.addSectionHandler("publishAdapter",
bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
_1, _2, _3, prefix));
@@ -300,10 +334,10 @@
for (auto item = section.begin();
item != section.end();
- ++ item)
+ ++item)
{
if (item->first == "signingId") {
- signingId.assign(item->second.get_value<std::string>());
+ signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Invalid value for \"signingId\""
" in \"publish\" section");
@@ -319,30 +353,30 @@
const util::ConfigSection& databaseSection = item->second;
for (auto subItem = databaseSection.begin();
subItem != databaseSection.end();
- ++ subItem) {
+ ++subItem) {
if (subItem->first == "dbServer") {
- dbServer.assign(subItem->second.get_value<std::string>());
+ dbServer = subItem->second.get_value<std::string>();
if (dbServer.empty()){
throw Error("Invalid value for \"dbServer\""
" in \"publish\" section");
}
}
if (subItem->first == "dbName") {
- dbName.assign(subItem->second.get_value<std::string>());
+ dbName = subItem->second.get_value<std::string>();
if (dbName.empty()){
throw Error("Invalid value for \"dbName\""
" in \"publish\" section");
}
}
if (subItem->first == "dbUser") {
- dbUser.assign(subItem->second.get_value<std::string>());
+ dbUser = subItem->second.get_value<std::string>();
if (dbUser.empty()){
throw Error("Invalid value for \"dbUser\""
" in \"publish\" section");
}
}
if (subItem->first == "dbPasswd") {
- dbPasswd.assign(subItem->second.get_value<std::string>());
+ dbPasswd = subItem->second.get_value<std::string>();
if (dbPasswd.empty()){
throw Error("Invalid value for \"dbPasswd\""
" in \"publish\" section");
@@ -354,10 +388,10 @@
const util::ConfigSection& synSection = item->second;
for (auto subItem = synSection.begin();
subItem != synSection.end();
- ++ subItem) {
+ ++subItem) {
if (subItem->first == "prefix") {
syncPrefix.clear();
- syncPrefix.assign(subItem->second.get_value<std::string>());
+ syncPrefix = subItem->second.get_value<std::string>();
if (syncPrefix.empty()){
throw Error("Invalid value for \"prefix\""
" in \"publish\\sync\" section");
@@ -370,6 +404,8 @@
m_prefix = prefix;
m_signingId = ndn::Name(signingId);
+ setCatalogId();
+
m_syncPrefix.clear();
m_syncPrefix.append(syncPrefix);
util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
@@ -403,34 +439,28 @@
`session_name` varchar(1000) NOT NULL, \
`seq_num` int(11) NOT NULL, \
PRIMARY KEY (`id`), \
- UNIQUE KEY `id_UNIQUE` (`id`) \
+ UNIQUE KEY `id_UNIQUE` (`id`) \
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
- MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE, success, errMsg);
+ MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
+ success, errMsg);
if (!success)
std::cout << errMsg << std::endl;
- std::string createCmip5Table =
- "CREATE TABLE `cmip5` ( \
- `id` int(100) NOT NULL AUTO_INCREMENT, \
- `sha256` varchar(64) NOT NULL, \
- `name` varchar(1000) NOT NULL, \
- `activity` varchar(100) NOT NULL, \
- `product` varchar(100) NOT NULL, \
- `organization` varchar(100) NOT NULL, \
- `model` varchar(100) NOT NULL, \
- `experiment` varchar(100) NOT NULL, \
- `frequency` varchar(100) NOT NULL, \
- `modeling_realm` varchar(100) NOT NULL, \
- `variable_name` varchar(100) NOT NULL, \
- `ensemble` varchar(100) NOT NULL, \
- `time` varchar(100) NOT NULL, \
- PRIMARY KEY (`id`), \
- UNIQUE KEY `sha256` (`sha256`) \
+ // create SQL string for table creation, id, sha256, and name are columns that we need
+ std::stringstream ss;
+ ss << "CREATE TABLE `" << m_databaseTable << "` (\
+ `id` int(100) NOT NULL AUTO_INCREMENT, \
+ `sha256` varchar(64) NOT NULL, \
+ `name` varchar(1000) NOT NULL,";
+ for (size_t i = 0; i < m_nameFields.size(); i++) {
+ ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
+ }
+ ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
success = false;
- MySQLPerformQuery(m_databaseHandler, createCmip5Table, util::CREATE, success, errMsg);
+ MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
if (!success)
std::cout << errMsg << std::endl;
}
@@ -582,9 +612,11 @@
chronosync::SeqNo
PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
{
- std::string sql = "select seq_num from chronosync_update_info where session_name = '"
+ std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
+ update.session.toUri() + "';";
+#ifndef NDEBUG
std::cout << "get latest seqNo : " << sql << std::endl;
+#endif
std::string errMsg;
bool success;
std::shared_ptr<MYSQL_RES> results
@@ -618,9 +650,9 @@
void
PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
{
- std::string sql = "update chronosync_update_info set seq_num = "
+ std::string sql = "UPDATE chronosync_update_info SET seq_num = "
+ boost::lexical_cast<std::string>(update.high)
- + " where session_name = '" + update.session.toUri() + "';";
+ + " WHERE session_name = '" + update.session.toUri() + "';";
std::cout << "renew update Info : " << sql << std::endl;
std::string errMsg;
bool success = false;
@@ -665,7 +697,8 @@
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const
+ std::shared_ptr<const ndn::Data>& data,
const std::string& failureInfo)
{
std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
@@ -681,14 +714,14 @@
}
// multiple updates from different catalog are possible
- for (size_t i = 0; i < updates.size(); ++ i) {
+ for (size_t i = 0; i < updates.size(); ++i) {
// check if the session is in local DB
// if yes, only fetch packets whose seq number is bigger than the one in the DB
// if no, directly fetch Data
chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
bool update = false;
- for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++ seq) {
+ for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
if (seq > localSeqNo) {
m_socket->fetchData(updates[i].session, seq,
bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
@@ -747,15 +780,15 @@
if (updateNumber <= 0)
return false;
- sqlString << "INSERT INTO cmip5 (";
- for (size_t i = 0; i < atmosTableColumns.size(); ++ i) {
+ sqlString << "INSERT INTO " << m_databaseTable << " (";
+ for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
if (i != 0)
sqlString << ", ";
sqlString << atmosTableColumns[i];
}
sqlString << ") VALUES";
- for (size_t i = 0; i < updateNumber; ++ i) { //parse each file name
+ for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
if (i > 0)
sqlString << ",";
// cast might be overflowed
@@ -784,8 +817,8 @@
if (updateNumber <= 0)
return false;
- sqlString << "delete from cmip5 where name in (";
- for (size_t i = 0; i < updateNumber; ++ i) {
+ sqlString << "delete from " << m_databaseTable << " where name in (";
+ for (size_t i = 0; i < updateNumber; ++i) {
if (i > 0)
sqlString << ",";
// cast might be overflowed
@@ -826,7 +859,7 @@
return false;
while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
- count ++;
+ count++;
token = fileName.substr(start, pos - start);
if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
return false; //fileName contains more than 10 fields
@@ -845,7 +878,8 @@
template<typename DatabaseHandler>
bool
-PublishAdapter<DatabaseHandler>::validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data)
+PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
+ std::shared_ptr<const ndn::Data>& data)
{
// The data name must be "/<publisher-prefix>/<nonce>"
// the prefix is the data name removes the last component