base catalog version on ChronoSync
* remove the redundant TableColumns in publishAdapter
* share chonosync::Socket in adapters to allow queryAdapter to use digest as version
* add various levels of log messages
refs: #2770
Change-Id: Id5aca0f84d9b7963fc63bf0329fbc6394f2b5777
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
index 1d94dfc..d1d6118 100644
--- a/catalog/src/catalog/catalog.cpp
+++ b/catalog/src/catalog/catalog.cpp
@@ -51,10 +51,6 @@
if (i->first == "prefix") {
m_prefix.clear();
m_prefix.append(i->second.get_value<std::string>());
- if (m_prefix.empty()) {
- throw Error("Empty value for \"prefix\""
- " in \"general\" section");
- }
}
if (i->first == "nameFields") {
std::istringstream ss(i->second.get_value<std::string>());
@@ -67,10 +63,17 @@
m_databaseTable = i->second.get_value<std::string>();
}
}
- if (m_nameFields.size() == 0) { // nameFields must not be empty
- throw Error("Empty value for \"nameFields\""
- " in \"general\" section");
+
+ if (m_prefix.empty()) { // Catalog prefix must not be empty
+ throw Error("Empty value for \"prefix\""
+ " in \"general\" section");
}
+
+ if (m_nameFields.empty()) { // nameFields must not be empty
+ throw Error("Empty value for \"nameFields\""
+ " in \"general\" section");
+ }
+
if (m_databaseTable.empty()) {
throw Error("Empty value for \"databaseTable\""
" in \"general\" section");
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 7655315..4bf1142 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -24,6 +24,7 @@
#include <memory>
#include <getopt.h>
#include <ndn-cxx/face.hpp>
+#include <ChronoSync/socket.hpp>
#ifdef HAVE_LOG4CXX
INIT_LOGGER("atmos-catalog::Main");
@@ -71,20 +72,26 @@
std::shared_ptr<ndn::Face> face(new ndn::Face());
std::shared_ptr<ndn::KeyChain> keyChain(new ndn::KeyChain());
+ // For now, share chronosync::Socket in both queryAdapter and publishAdapter
+ // to allow queryAdapter to get the digest.
+ // We may have to save digest in Database later
+ std::shared_ptr<chronosync::Socket> syncSocket;
+
std::unique_ptr<atmos::util::CatalogAdapter>
- queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain));
+ queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain, syncSocket));
std::unique_ptr<atmos::util::CatalogAdapter>
- publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain));
+ publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain, syncSocket));
atmos::catalog::Catalog catalogInstance(face, keyChain, configFile);
- catalogInstance.addAdapter(queryAdapter);
catalogInstance.addAdapter(publishAdapter);
+ catalogInstance.addAdapter(queryAdapter);
try {
catalogInstance.initialize();
}
catch (std::exception& e) {
- std::cout << e.what() << std::endl;
+ _LOG_ERROR(e.what());
+ return 1;
}
#ifndef NDEBUG
@@ -94,7 +101,8 @@
#ifndef NDEBUG
}
catch (std::exception& e) {
- _LOG_DEBUG(e.what());
+ _LOG_ERROR(e.what());
+ return 1;
}
#endif
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index aa61845..b7b8355 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -51,11 +51,6 @@
#endif
#define RETRY_WHEN_TIMEOUT 2
-// TODO: need to use the configured nameFields
-std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
- "organization", "model", "experiment",
- "frequency", "modeling_realm",
- "variable_name", "ensemble", "time"}};
/**
* PublishAdapter handles the Publish usecases for the catalog
@@ -66,11 +61,13 @@
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain that will be used for data signing
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
+ * @param syncSocket: ChronoSync socket
*/
PublishAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain);
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ std::shared_ptr<chronosync::Socket>& syncSocket);
virtual
~PublishAdapter();
@@ -234,10 +231,10 @@
std::shared_ptr<DatabaseHandler> m_databaseHandler;
std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
RegisteredPrefixList m_registeredPrefixList;
- std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+ std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
+ std::vector<std::string> m_tableColumns;
// mutex to control critical sections
std::mutex m_mutex;
- std::vector<std::string> m_tableColumns;
// TODO: create thread for each request, and the variables below should be within the thread
bool m_mustBeFresh;
bool m_isFinished;
@@ -247,8 +244,10 @@
template <typename DatabaseHandler>
PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain)
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ std::shared_ptr<chronosync::Socket>& syncSocket)
: util::CatalogAdapter(face, keyChain)
+ , m_socket(syncSocket)
, m_mustBeFresh(true)
, m_isFinished(false)
, m_catalogId("catalogIdPlaceHolder")
@@ -319,6 +318,13 @@
const std::string& databaseTable)
{
m_nameFields = nameFields;
+
+ //initialize m_tableColumns
+ m_tableColumns = nameFields;
+ auto it = m_tableColumns.begin();
+ it = m_tableColumns.insert(it, std::string("name"));
+ it = m_tableColumns.insert(it, std::string("sha256"));
+
m_databaseTable = databaseTable;
config.addSectionHandler("publishAdapter",
bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
@@ -348,7 +354,7 @@
signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Invalid value for \"signingId\""
- " in \"publish\" section");
+ " in \"publish\" section");
}
}
else if (item->first == "security") {
@@ -364,33 +370,35 @@
++subItem) {
if (subItem->first == "dbServer") {
dbServer = subItem->second.get_value<std::string>();
- if (dbServer.empty()){
- throw Error("Invalid value for \"dbServer\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbName") {
dbName = subItem->second.get_value<std::string>();
- if (dbName.empty()){
- throw Error("Invalid value for \"dbName\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbUser") {
dbUser = subItem->second.get_value<std::string>();
- if (dbUser.empty()){
- throw Error("Invalid value for \"dbUser\""
- " in \"publish\" section");
- }
}
if (subItem->first == "dbPasswd") {
dbPasswd = subItem->second.get_value<std::string>();
- if (dbPasswd.empty()){
- throw Error("Invalid value for \"dbPasswd\""
- " in \"publish\" section");
- }
}
}
+
+ // Items below must not be empty
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"publish\" section");
+ }
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"publish\" section");
+ }
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"publish\" section");
+ }
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"publish\" section");
+ }
}
else if (item->first == "sync") {
const util::ConfigSection& synSection = item->second;
@@ -398,11 +406,10 @@
subItem != synSection.end();
++subItem) {
if (subItem->first == "prefix") {
- syncPrefix.clear();
- syncPrefix = subItem->second.get_value<std::string>();
+ syncPrefix = subItem->second.get_value<std::string>();
if (syncPrefix.empty()){
throw Error("Invalid value for \"prefix\""
- " in \"publish\\sync\" section");
+ " in \"publish\\sync\" section");
}
}
// todo: parse the sync_security section
@@ -451,8 +458,10 @@
MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
success, errMsg);
+#ifndef NDEBUG
if (!success)
_LOG_DEBUG(errMsg);
+#endif
// create SQL string for table creation, id, sha256, and name are columns that we need
std::stringstream ss;
@@ -468,8 +477,11 @@
success = false;
MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
+
+#ifndef NDEBUG
if (!success)
_LOG_DEBUG(errMsg);
+#endif
}
else {
throw Error("cannot connect to the Database");
@@ -494,7 +506,7 @@
m_keyChain->sign(*data);
m_face->put(*data);
- _LOG_DEBUG("ACK interest : " << interest.getName().toUri());
+ _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
//TODO: if already in catalog, what do we do?
@@ -518,7 +530,7 @@
void
PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
{
- _LOG_DEBUG(interest.getName() << "timed out");
+ _LOG_ERROR(interest.getName() << "timed out");
}
template <typename DatabaseHandler>
@@ -526,7 +538,7 @@
PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
const std::string& failureInfo)
{
- _LOG_DEBUG(data->getName() << " validation failed: " << failureInfo);
+ _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
}
template <typename DatabaseHandler>
@@ -535,7 +547,7 @@
const ndn::Data& data)
{
_LOG_DEBUG(">> PublishAdapter::onPublishedData");
- _LOG_DEBUG("Data name: " << data.getName());
+ _LOG_DEBUG("Recv data : " << data.getName());
if (data.getContent().empty()) {
return;
}
@@ -558,12 +570,10 @@
// validate published data payload, if failed, return
if (!validatePublicationChanges(data)) {
- _LOG_DEBUG("Data validation failed : " << data->getName());
-#ifndef NDEBUG
+ _LOG_ERROR("Data validation failed : " << data->getName());
const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
data->getContent().value_size());
_LOG_DEBUG(payload);
-#endif
return;
}
@@ -695,7 +705,7 @@
util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template <typename DatabaseHandler>
@@ -719,7 +729,7 @@
util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template <typename DatabaseHandler>
@@ -727,7 +737,7 @@
PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
{
// todo: record event, and use recovery Interest to fetch the whole table
- _LOG_DEBUG("UpdateData retrieval timed out: " << interest.getName());
+ _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
}
template <typename DatabaseHandler>
@@ -792,7 +802,7 @@
atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
m_mutex.unlock();
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
}
template<typename DatabaseHandler>
@@ -810,10 +820,10 @@
return false;
sqlString << "INSERT INTO " << m_databaseTable << " (";
- for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
+ for (size_t i = 0; i < m_tableColumns.size(); ++i) {
if (i != 0)
sqlString << ", ";
- sqlString << atmosTableColumns[i];
+ sqlString << m_tableColumns[i];
}
sqlString << ") VALUES";
@@ -823,7 +833,7 @@
// cast might be overflowed
Json::Value item = jsonValue["add"][static_cast<int>(i)];
if (!item.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("malformed JsonQuery string");
+ _LOG_ERROR("Malformed JsonQuery string");
return false;
}
std::string fileName(item.asString());
@@ -853,7 +863,7 @@
// cast might be overflowed
Json::Value item = jsonValue["remove"][static_cast<int>(i)];
if (!item.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("Malformed JsonQuery");
+ _LOG_ERROR("Malformed JsonQuery");
return false;
}
std::string fileName(item.asString());
@@ -890,15 +900,16 @@
while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
count++;
token = fileName.substr(start, pos - start);
- if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
- return false; //fileName contains more than 10 fields
+ // exclude the sha256 and name (already processed)
+ if (count >= m_tableColumns.size() - 2) {
+ return false;
}
sqlString << ",'" << token << "'";
start = pos + 1;
}
- // must be 10 fields in total (add the tail one)
- if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
+ // sha256 and name have been processed, and the last token will be processed later
+ if (count != m_tableColumns.size() - 3 || std::string::npos == start)
return false;
token = fileName.substr(start, std::string::npos - start);
sqlString << ",'" << token << "'";
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index 435ad60..c0f292c 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -39,6 +39,7 @@
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/util/in-memory-storage-lru.hpp>
#include <ndn-cxx/util/string-helper.hpp>
+#include <ChronoSync/socket.hpp>
#include "mysql/mysql.h"
@@ -72,11 +73,13 @@
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain that will be used for data signing
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
+ * @param syncSocket: ChronoSync socket
*/
QueryAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain);
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ const std::shared_ptr<chronosync::Socket>& syncSocket);
virtual
~QueryAdapter();
@@ -249,30 +252,38 @@
getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
const ndn::Name::Component& version);
+ std::string
+ getChronoSyncDigest();
+
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
// Handle to the Catalog's database
std::shared_ptr<DatabaseHandler> m_databaseHandler;
+ const std::shared_ptr<chronosync::Socket>& m_socket;
// mutex to control critical sections
std::mutex m_mutex;
// @{ needs m_mutex protection
// The Queries we are currently writing to
- std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
-
+ //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
+ ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
ndn::util::InMemoryStorageLru m_cache;
+ std::string m_chronosyncDigest;
// @}
RegisteredPrefixList m_registeredPrefixList;
- //std::vector<std::string> m_atmosColumns;
ndn::Name m_catalogId; // should be replaced with the PK digest
std::vector<std::string> m_filterCategoryNames;
};
template <typename DatabaseHandler>
QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain)
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ const std::shared_ptr<chronosync::Socket>& syncSocket)
: util::CatalogAdapter(face, keyChain)
+ , m_socket(syncSocket)
+ , m_activeQueryToFirstResponse(100000)
, m_cache(250000)
+ , m_chronosyncDigest("0")
, m_catalogId("catalogIdPlaceHolder") // initialize for unitests
{
}
@@ -345,7 +356,7 @@
signingId = item->second.get_value<std::string>();
if (signingId.empty()) {
throw Error("Empty value for \"signingId\""
- " in \"query\" section");
+ " in \"query\" section");
}
}
if (item->first == "filterCategoryNames") {
@@ -363,37 +374,38 @@
{
if (subItem->first == "dbServer") {
dbServer = subItem->second.get_value<std::string>();
- if (dbServer.empty()){
- throw Error("Invalid value for \"dbServer\""
- " in \"query\" section");
- }
}
if (subItem->first == "dbName") {
dbName = subItem->second.get_value<std::string>();
- if (dbName.empty()){
- throw Error("Invalid value for \"dbName\""
- " in \"query\" section");
- }
}
if (subItem->first == "dbUser") {
dbUser = subItem->second.get_value<std::string>();
- if (dbUser.empty()){
- throw Error("Invalid value for \"dbUser\""
- " in \"query\" section");
- }
}
if (subItem->first == "dbPasswd") {
dbPasswd = subItem->second.get_value<std::string>();
- if (dbPasswd.empty()){
- throw Error("Invalid value for \"dbPasswd\""
- " in \"query\" section");
- }
}
}
+
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"query\" section");
+ }
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"query\" section");
+ }
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"query\" section");
+ }
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"query\" section");
+ }
}
}
- if (m_filterCategoryNames.size() == 0) {
+ if (m_filterCategoryNames.empty()) {
throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
}
@@ -468,6 +480,7 @@
// @todo: return a nack
return;
}
+
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
_LOG_DEBUG("Interest : " << interestPtr->getName());
@@ -502,7 +515,7 @@
template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
+ const ndn::Interest& interest)
{
_LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
@@ -560,9 +573,9 @@
m_cache.insert(*filterData);
try {
m_face->put(*filterData);
- }// catch exceptions and log
+ }
catch (std::exception& e) {
- _LOG_DEBUG(e.what());
+ _LOG_ERROR(e.what());
}
m_mutex.unlock();
@@ -612,7 +625,7 @@
= atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
util::QUERY, success, errMsg);
if (!success) {
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
value.clear();
return;
}
@@ -670,9 +683,9 @@
ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
signData(*ack);
-#ifndef NDEBUG
- _LOG_DEBUG(queryResultNameStr);
-#endif
+
+ _LOG_DEBUG("Make ACK : " << queryResultNameStr);
+
return ack;
}
@@ -688,9 +701,9 @@
nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
signData(*nack);
-#ifndef NDEBUG
- _LOG_DEBUG(ndn::Name(dataPrefix).appendSegment(segmentNo));
-#endif
+
+ _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
+
m_mutex.lock();
m_cache.insert(*nack);
m_mutex.unlock();
@@ -703,9 +716,9 @@
Json::Value& jsonValue)
{
_LOG_DEBUG(">> QueryAdapter::json2Sql");
-#ifndef NDEBUG
+
_LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
if (jsonValue.type() != Json::objectValue) {
return false;
}
@@ -718,13 +731,13 @@
Json::Value value = (*iter);
if (key == Json::nullValue || value == Json::nullValue) {
- _LOG_DEBUG("null key or value in JsonValue");
+ _LOG_ERROR("Null key or value in JsonValue");
return false;
}
// cannot convert to string
if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("malformed JsonQuery string");
+ _LOG_ERROR("Malformed JsonQuery string");
return false;
}
@@ -756,9 +769,9 @@
bool& lastComponent)
{
_LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
-#ifndef NDEBUG
+
_LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
if (jsonValue.type() != Json::objectValue) {
return false;
}
@@ -771,13 +784,13 @@
Json::Value value = (*iter);
if (key == Json::nullValue || value == Json::nullValue) {
- _LOG_DEBUG("null key or value in JsonValue");
+ _LOG_ERROR("Null key or value in JsonValue");
return false;
}
// cannot convert to string
if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("malformed JsonQuery string");
+ _LOG_ERROR("Malformed JsonQuery string");
return false;
}
@@ -838,9 +851,9 @@
Json::Value& jsonValue)
{
_LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
-#ifndef NDEBUG
+
_LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
if (jsonValue.type() != Json::objectValue) {
return false;
}
@@ -853,13 +866,13 @@
Json::Value value = (*iter);
if (key == Json::nullValue || value == Json::nullValue) {
- _LOG_DEBUG("null key or value in JsonValue");
+ _LOG_ERROR("Null key or value in JsonValue");
return false;
}
// cannot convert to string
if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
- _LOG_DEBUG("malformed JsonQuery string");
+ _LOG_ERROR("Malformed JsonQuery string");
return false;
}
@@ -934,54 +947,70 @@
// no JSON query, send Nack?
return;
}
- // check if the ACK is cached, if yes, respond with ACK
- // ?? what if the results for now it NULL, but latter exist?
- // For efficiency, do a double check. Once without the lock, then with it.
- if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- // If this fails upon locking, we removed it during our search.
- // An unusual race-condition case, which requires things like PIT aggregation to be off.
- auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
- if (iter != m_activeQueryToFirstResponse.end()) {
- m_face->put(*(iter->second));
- m_mutex.unlock(); //escape lock
- return;
- }
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
+
+ // the version should be replaced with ChronoSync state digest
+ ndn::name::Component version;
+
+ if(m_socket != nullptr) {
+ const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
+ std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
+ _LOG_DEBUG("Original digest" << m_chronosyncDigest);
+ _LOG_DEBUG("New digest : " << digestStr);
+ // if the m_chronosyncDigest and the rootdigest are not equal
+ if (digestStr != m_chronosyncDigest) {
+ // (1) update chronosyncDigest
+ // (2) clear all staled ACK data
+ m_mutex.lock();
+ m_chronosyncDigest = digestStr;
+ m_activeQueryToFirstResponse.erase(ndn::Name("/"));
+ m_mutex.unlock();
+ _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
+ }
+ version = ndn::name::Component::fromEscapedString(digestStr);
}
+ else {
+ version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
+ }
+
+ // try to respond with the inMemoryStorage
+ m_mutex.lock();
+ { // !!! BEGIN CRITICAL SECTION !!!
+ auto data = m_activeQueryToFirstResponse.find(interest->getName());
+ if (data) {
+ _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
+ m_face->put(*data);
+ m_mutex.unlock();
+ return;
+ }
+ } // !!! END CRITICAL SECTION !!!
+ m_mutex.unlock();
// 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
Json::Value parsedFromString;
Json::Reader reader;
if (!reader.parse(jsonQuery, parsedFromString)) {
// @todo: send NACK?
- _LOG_DEBUG("cannot parse the JsonQuery");
+ _LOG_ERROR("Cannot parse the JsonQuery");
return;
}
- // the version should be replaced with ChronoSync state digest
- const ndn::name::Component version
- = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
- ndn::time::system_clock::now()).count());
-
std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
m_mutex.lock();
{ // !!! BEGIN CRITICAL SECTION !!!
// An unusual race-condition case, which requires things like PIT aggregation to be off.
- auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
- if (iter != m_activeQueryToFirstResponse.end()) {
- m_face->put(*(iter->second));
- m_mutex.unlock(); // escape lock
+ auto data = m_activeQueryToFirstResponse.find(interest->getName());
+ if (data) {
+ m_face->put(*data);
+ m_mutex.unlock();
return;
}
+
// This is where things are expensive so we save them for the lock
// note that we ack the query with the cached ACK messages, but we should remove the ACKs
// that conatin the old version when ChronoSync is updated
- //m_activeQueryToFirstResponse.insert(std::pair<std::string,
- // std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+ m_activeQueryToFirstResponse.insert(*ack);
+
m_face->put(*ack);
} // !!! END CRITICAL SECTION !!!
m_mutex.unlock();
@@ -1038,19 +1067,19 @@
bool lastComponent)
{
_LOG_DEBUG(">> QueryAdapter::prepareSegments");
-#ifndef NDEBUG
+
_LOG_DEBUG(sqlString);
-#endif
+
std::string errMsg;
bool success;
// 4) Run the Query
std::shared_ptr<MYSQL_RES> results
= atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
if (!success)
- _LOG_DEBUG(errMsg);
+ _LOG_ERROR(errMsg);
if (!results) {
- _LOG_DEBUG("NULL MYSQL_RES for" << sqlString);
+ _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
// @todo: throw runtime error or log the error message?
return;
@@ -1058,9 +1087,7 @@
uint64_t resultCount = mysql_num_rows(results.get());
-#ifndef NDEBUG
_LOG_DEBUG("Query resuls contain " << resultCount << "rows");
-#endif
MYSQL_ROW row;
uint64_t segmentNo = 0;
@@ -1119,11 +1146,9 @@
if (lastComponent)
entry["lastComponent"] = Json::Value(true);
-#ifndef NDEBUG
_LOG_DEBUG("resultCount " << resultCount << ";"
<< "viewStart " << viewStart << ";"
<< "viewEnd " << viewEnd);
-#endif
if (isAutocomplete) {
entry["next"] = value;
@@ -1143,13 +1168,14 @@
if (isFinalBlock) {
data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
}
-#ifndef NDEBUG
+
_LOG_DEBUG(segmentName);
-#endif
+
signData(*data);
return data;
}
+
} // namespace query
} // namespace atmos
#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 7c04c83..04b101d 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -56,8 +56,8 @@
/**
* Constructor
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain that will be used for data signing
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
*/
CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
const std::shared_ptr<ndn::KeyChain>& keyChain);
diff --git a/catalog/src/util/logger.hpp b/catalog/src/util/logger.hpp
index 3e2d8dd..f7c66f0 100644
--- a/catalog/src/util/logger.hpp
+++ b/catalog/src/util/logger.hpp
@@ -34,6 +34,9 @@
#define INIT_LOGGER(name) \
static log4cxx::LoggerPtr staticModuleLogger = log4cxx::Logger::getLogger(name)
+#define _LOG_INFO(x) \
+ LOG4CXX_INFO(staticModuleLogger, x)
+
#define _LOG_DEBUG(x) \
LOG4CXX_DEBUG(staticModuleLogger, x)
@@ -49,6 +52,9 @@
#define _LOG_ERROR(x) \
LOG4CXX_ERROR(staticModuleLogger, x)
+#define _LOG_FATAL(x) \
+ LOG4CXX_FATAL(staticModuleLogger, x)
+
#else // HAVE_LOG4CXX
#define INIT_LOGGER(name)
@@ -56,7 +62,8 @@
#define _LOG_FUNCTION_NOARGS
#define _LOG_TRACE(x)
#define INIT_LOGGERS(x)
-#define _LOG_ERROR(x)
+#define _LOG_ERROR(x) \
+ std::cerr << x << std::endl
#ifdef _DEBUG
@@ -65,13 +72,18 @@
#include <ndn-cxx/util/time.hpp>
// to print out messages, must be in debug mode
+
#define _LOG_DEBUG(x) \
- std::clog << ndn::time::system_clock::now() << " " << std::this_thread::get_id() << \
- " " << x << std::endl
+ std::clog << ndn::time::system_clock::now() << " " << x << std::endl
+
+#define _LOG_FATAL(x) \
+ std::clog << ndn::time::system_clock::now() << " " << x << std::endl
+
#else // _DEBUG
#define _LOG_DEBUG(x)
+#define _LOG_FATAL(x)
#endif // _DEBUG
diff --git a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
index 8ff0486..99d7e90 100644
--- a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
+++ b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
@@ -35,8 +35,9 @@
{
public:
PublishAdapterTest(const std::shared_ptr<ndn::util::DummyClientFace>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain)
- : publish::PublishAdapter<std::string>(face, keyChain)
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ std::shared_ptr<chronosync::Socket>& syncSocket)
+ : publish::PublishAdapter<std::string>(face, keyChain, syncSocket)
{
}
@@ -111,8 +112,8 @@
: face(makeDummyClientFace(io))
, keyChain(new ndn::KeyChain())
, databaseTable("cmip5")
- , publishAdapterTest1(face, keyChain)
- , publishAdapterTest2(face, keyChain)
+ , publishAdapterTest1(face, keyChain, syncSocket)
+ , publishAdapterTest2(face, keyChain, syncSocket)
{
std::string cx("sha256"), c0("name"), c1("activity"), c2("product"), c3("organization");
std::string c4("model"), c5("experiment"), c6("frequency"), c7("modeling_realm");
@@ -197,6 +198,7 @@
protected:
std::shared_ptr<DummyClientFace> face;
std::shared_ptr<ndn::KeyChain> keyChain;
+ std::shared_ptr<chronosync::Socket> syncSocket;
std::vector<std::string> tableFields;
std::string databaseTable;
PublishAdapterTest publishAdapterTest1;
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index 8b210b7..c9252d3 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -35,8 +35,9 @@
{
public:
QueryAdapterTest(const std::shared_ptr<ndn::util::DummyClientFace>& face,
- const std::shared_ptr<ndn::KeyChain>& keyChain)
- : query::QueryAdapter<std::string>(face, keyChain)
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ const std::shared_ptr<chronosync::Socket>& syncSocket)
+ : query::QueryAdapter<std::string>(face, keyChain, syncSocket)
{
}
@@ -131,21 +132,6 @@
}
std::shared_ptr<const ndn::Data>
- getDataFromActiveQuery(const std::string& jsonQuery)
- {
- m_mutex.lock();
- if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
- auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
- if (iter != m_activeQueryToFirstResponse.end()) {
- m_mutex.unlock();
- return iter->second;
- }
- }
- m_mutex.unlock();
- return std::shared_ptr<const ndn::Data>();
- }
-
- std::shared_ptr<const ndn::Data>
getDataFromCache(const ndn::Interest& interest)
{
return m_cache.find(interest);
@@ -181,8 +167,8 @@
: face(makeDummyClientFace(io))
, keyChain(new ndn::KeyChain())
, databaseTable("cmip5")
- , queryAdapterTest1(face, keyChain)
- , queryAdapterTest2(face, keyChain)
+ , queryAdapterTest1(face, keyChain, syncSocket)
+ , queryAdapterTest2(face, keyChain, syncSocket)
{
std::string c1("activity"), c2("product"), c3("organization"), c4("model");
std::string c5("experiment"), c6("frequency"), c7("modeling_realm"), c8("variable_name");
@@ -261,6 +247,7 @@
protected:
std::shared_ptr<DummyClientFace> face;
std::shared_ptr<ndn::KeyChain> keyChain;
+ std::shared_ptr<chronosync::Socket> syncSocket;
std::string databaseTable;
std::vector<std::string> nameFields;
QueryAdapterTest queryAdapterTest1;
diff --git a/log4cxx.properties b/log4cxx.properties
index 3144df3..cdf2386 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -1,22 +1,21 @@
-# Set root logger level to DEBUG and its only appender to R.
-log4j.rootLogger=ERROR, R
+# Please refer to https://logging.apache.org/log4cxx/usage.html for option details
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=example.log
-log4j.appender.R.MaxFileSize=100KB
+log4j.rootLogger=INFO, rootConsoleAppender, rootFileAppender
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
+# rootConsoleAppender prints out messages to console
+log4j.appender.rootConsoleAppender=org.apache.log4j.ConsoleAppender
+log4j.appender.rootConsoleAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.rootConsoleAppender.layout.ConversionPattern=%d %-5p [%c] - %m%n
-# R is set to be a ConsoleAppender.
-#log4j.appender.R=org.apache.log4j.ConsoleAppender
+# rootFileAppender dumps messages to a file
+#log4j.appender.rootFileAppender=org.apache.log4j.FileAppender
+log4j.appender.rootFileAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.rootFileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.rootFileAppender.layout.ConversionPattern=%d %-5p [%c] - %m%n
+log4j.appender.rootFileAppender.File=catalog.log
+log4j.appender.rootFileAppender.MaxFileSize=100KB
-# R uses PatternLayout.
-#log4j.appender.R.layout=org.apache.log4j.PatternLayout
-#log4j.appender.R.target=System.err
-#log4j.appender.R.layout.ConversionPattern=%d{dd-MMM HH:MM:SS,SSS} %p %c %m%n
-#log4j.appender.R.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
-#log4j.appender.R.layout.ConversionPattern=%d{ss,SSS} %-12c %m%n
+# Change the log level (INFO, DEBUG, etc.) here
+log4j.logger.QueryAdapter=INFO
+log4j.logger.PublishAdapter=INFO
-log4j.logger.QueryAdapter = DEBUG
-log4j.logger.PublishAdapter = DEBUG