base catalog version on ChronoSync
* remove the redundant TableColumns in publishAdapter
* share chonosync::Socket in adapters to allow queryAdapter to use digest as version
* add various levels of log messages
refs: #2770
Change-Id: Id5aca0f84d9b7963fc63bf0329fbc6394f2b5777
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index aa61845..b7b8355 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -51,11 +51,6 @@
#endif
#define RETRY_WHEN_TIMEOUT 2
-// TODO: need to use the configured nameFields
-std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
- "organization", "model", "experiment",
- "frequency", "modeling_realm",
- "variable_name", "ensemble", "time"}};
/**
* PublishAdapter handles the Publish usecases for the catalog
@@ -66,11 +61,13 @@
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain that will be used for data signing
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
+ * @param syncSocket: ChronoSync socket
*/
PublishAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain);
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ std::shared_ptr<chronosync::Socket>& syncSocket);
virtual
~PublishAdapter();
@@ -234,10 +231,10 @@
std::shared_ptr<DatabaseHandler> m_databaseHandler;
std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
RegisteredPrefixList m_registeredPrefixList;
- std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+ std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
+ std::vector<std::string> m_tableColumns;
// mutex to control critical sections
std::mutex m_mutex;
- std::vector<std::string> m_tableColumns;
// TODO: create thread for each request, and the variables below should be within the thread
bool m_mustBeFresh;
bool m_isFinished;
@@ -247,8 +244,10 @@
template <typename DatabaseHandler>
PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain)
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ std::shared_ptr<chronosync::Socket>& syncSocket)
: util::CatalogAdapter(face, keyChain)
+ , m_socket(syncSocket)
, m_mustBeFresh(true)
, m_isFinished(false)
, m_catalogId("catalogIdPlaceHolder")
@@ -319,6 +318,13 @@
const std::string& databaseTable)
{
m_nameFields = nameFields;
+
+ //initialize m_tableColumns
+ m_tableColumns = nameFields;
+ auto it = m_tableColumns.begin();
+ it = m_tableColumns.insert(it, std::string("name"));
+ it = m_tableColumns.insert(it, std::string("sha256"));
+
m_databaseTable = databaseTable;
config.addSectionHandler("publishAdapter",
bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
@@ -348,7 +354,7 @@
signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Invalid value for \"signingId\""
- " in \"publish\" section");
+ " in \"publish\" section");
}
}
else if (item->first == "security") {
@@ -364,33 +370,35 @@
++subItem) {
if (subItem->first == "dbServer") {
dbServer = subItem->second.get_value<std::string>();
- if (dbServer.empty()){
- throw Error("Invalid value for \"dbServer\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbName") {
dbName = subItem->second.get_value<std::string>();
- if (dbName.empty()){
- throw Error("Invalid value for \"dbName\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbUser") {
dbUser = subItem->second.get_value<std::string>();
- if (dbUser.empty()){
- throw Error("Invalid value for \"dbUser\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbPasswd") {
dbPasswd = subItem->second.get_value<std::string>();
- if (dbPasswd.empty()){
- throw Error("Invalid value for \"dbPasswd\""
- " in \"publish\" section");
- }
}
}
+
+ // Items below must not be empty
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"publish\" section");
+ }
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"publish\" section");
+ }
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"publish\" section");
+ }
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"publish\" section");
+ }
}
else if (item->first == "sync") {
const util::ConfigSection& synSection = item->second;
@@ -398,11 +406,10 @@
subItem != synSection.end();
++subItem) {
if (subItem->first == "prefix") {
- syncPrefix.clear();
- syncPrefix = subItem->second.get_value<std::string>();
+ syncPrefix = subItem->second.get_value<std::string>();
if (syncPrefix.empty()){
throw Error("Invalid value for \"prefix\""
- " in \"publish\\sync\" section");
+ " in \"publish\\sync\" section");
}
}
// todo: parse the sync_security section
@@ -451,8 +458,10 @@
MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
success, errMsg);
+#ifndef NDEBUG
if (!success)
_LOG_DEBUG(errMsg);
+#endif
// create SQL string for table creation, id, sha256, and name are columns that we need
std::stringstream ss;
@@ -468,8 +477,11 @@
success = false;
MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
+
+#ifndef NDEBUG
if (!success)
_LOG_DEBUG(errMsg);
+#endif
}
else {
throw Error("cannot connect to the Database");
@@ -494,7 +506,7 @@
m_keyChain->sign(*data);
m_face->put(*data);
- _LOG_DEBUG("ACK interest : " << interest.getName().toUri());
+ _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
//TODO: if already in catalog, what do we do?
@@ -518,7 +530,7 @@
void
PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
{
- _LOG_DEBUG(interest.getName() << "timed out");
+ _LOG_ERROR(interest.getName() << "timed out");
}
template <typename DatabaseHandler>
@@ -526,7 +538,7 @@
PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
const std::string& failureInfo)
{
- _LOG_DEBUG(data->getName() << " validation failed: " << failureInfo);
+ _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
}
template <typename DatabaseHandler>
@@ -535,7 +547,7 @@
const ndn::Data& data)
{
_LOG_DEBUG(">> PublishAdapter::onPublishedData");
- _LOG_DEBUG("Data name: " << data.getName());
+ _LOG_DEBUG("Recv data : " << data.getName());
if (data.getContent().empty()) {
return;
}
@@ -558,12 +570,10 @@
// validate published data payload, if failed, return
if (!validatePublicationChanges(data)) {
- _LOG_DEBUG("Data validation failed : " << data->getName());
-#ifndef NDEBUG
+ _LOG_ERROR("Data validation failed : " << data->getName());
const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
data->getContent().value_size());
_LOG_DEBUG(payload);
-#endif
return;
}
@@ -695,7 +705,7 @@
util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template <typename DatabaseHandler>
@@ -719,7 +729,7 @@
util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template <typename DatabaseHandler>
@@ -727,7 +737,7 @@
PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
{
// todo: record event, and use recovery Interest to fetch the whole table
- _LOG_DEBUG("UpdateData retrieval timed out: " << interest.getName());
+ _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
}
template <typename DatabaseHandler>
@@ -792,7 +802,7 @@
atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template<typename DatabaseHandler>
@@ -810,10 +820,10 @@
return false;
sqlString << "INSERT INTO " << m_databaseTable << " (";
- for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
+ for (size_t i = 0; i < m_tableColumns.size(); ++i) {
if (i != 0)
sqlString << ", ";
- sqlString << atmosTableColumns[i];
+ sqlString << m_tableColumns[i];
}
sqlString << ") VALUES";
@@ -823,7 +833,7 @@
// cast might be overflowed
Json::Value item = jsonValue["add"][static_cast<int>(i)];
if (!item.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("malformed JsonQuery string");
+ _LOG_ERROR("Malformed JsonQuery string");
return false;
}
std::string fileName(item.asString());
@@ -853,7 +863,7 @@
// cast might be overflowed
Json::Value item = jsonValue["remove"][static_cast<int>(i)];
if (!item.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("Malformed JsonQuery");
+ _LOG_ERROR("Malformed JsonQuery");
return false;
}
std::string fileName(item.asString());
@@ -890,15 +900,16 @@
while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
count++;
token = fileName.substr(start, pos - start);
- if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
- return false; //fileName contains more than 10 fields
+ // exclude the sha256 and name (already processed)
+ if (count >= m_tableColumns.size() - 2) {
+ return false;
}
sqlString << ",'" << token << "'";
start = pos + 1;
}
- // must be 10 fields in total (add the tail one)
- if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
+ // sha256 and name have been processed, and the last token will be processed later
+ if (count != m_tableColumns.size() - 3 || std::string::npos == start)
return false;
token = fileName.substr(start, std::string::npos - start);
sqlString << ",'" << token << "'";