base catalog version on ChronoSync

* remove the redundant TableColumns in publishAdapter
* share chonosync::Socket in adapters to allow queryAdapter to use digest as version
* add various levels of log messages

refs: #2770

Change-Id: Id5aca0f84d9b7963fc63bf0329fbc6394f2b5777
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
index 1d94dfc..d1d6118 100644
--- a/catalog/src/catalog/catalog.cpp
+++ b/catalog/src/catalog/catalog.cpp
@@ -51,10 +51,6 @@
     if (i->first == "prefix") {
       m_prefix.clear();
       m_prefix.append(i->second.get_value<std::string>());
-      if (m_prefix.empty()) {
-        throw Error("Empty value for \"prefix\""
-                                 " in \"general\" section");
-      }
     }
     if (i->first == "nameFields") {
       std::istringstream ss(i->second.get_value<std::string>());
@@ -67,10 +63,17 @@
       m_databaseTable = i->second.get_value<std::string>();
     }
   }
-  if (m_nameFields.size() == 0) { // nameFields must not be empty
-    throw Error("Empty value for \"nameFields\""
-                             " in \"general\" section");
+
+  if (m_prefix.empty()) { // Catalog prefix must not be empty
+    throw Error("Empty value for \"prefix\""
+                " in \"general\" section");
   }
+
+  if (m_nameFields.empty()) { // nameFields must not be empty
+    throw Error("Empty value for \"nameFields\""
+                " in \"general\" section");
+  }
+
   if (m_databaseTable.empty()) {
     throw Error("Empty value for \"databaseTable\""
                 " in \"general\" section");
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 7655315..4bf1142 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -24,6 +24,7 @@
 #include <memory>
 #include <getopt.h>
 #include <ndn-cxx/face.hpp>
+#include <ChronoSync/socket.hpp>
 
 #ifdef HAVE_LOG4CXX
   INIT_LOGGER("atmos-catalog::Main");
@@ -71,20 +72,26 @@
   std::shared_ptr<ndn::Face> face(new ndn::Face());
   std::shared_ptr<ndn::KeyChain> keyChain(new ndn::KeyChain());
 
+  // For now, share chronosync::Socket in both queryAdapter and publishAdapter
+  // to allow queryAdapter to get the digest.
+  // We may have to save digest in Database later
+  std::shared_ptr<chronosync::Socket> syncSocket;
+
   std::unique_ptr<atmos::util::CatalogAdapter>
-    queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain));
+    queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain, syncSocket));
   std::unique_ptr<atmos::util::CatalogAdapter>
-    publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain));
+    publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain, syncSocket));
 
   atmos::catalog::Catalog catalogInstance(face, keyChain, configFile);
-  catalogInstance.addAdapter(queryAdapter);
   catalogInstance.addAdapter(publishAdapter);
+  catalogInstance.addAdapter(queryAdapter);
 
   try {
     catalogInstance.initialize();
   }
   catch (std::exception& e) {
-    std::cout << e.what() << std::endl;
+    _LOG_ERROR(e.what());
+    return 1;
   }
 
 #ifndef NDEBUG
@@ -94,7 +101,8 @@
 #ifndef NDEBUG
   }
   catch (std::exception& e) {
-    _LOG_DEBUG(e.what());
+    _LOG_ERROR(e.what());
+    return 1;
   }
 #endif
 
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index aa61845..b7b8355 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -51,11 +51,6 @@
 #endif
 
 #define RETRY_WHEN_TIMEOUT 2
-// TODO: need to use the configured nameFields
-std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
-                                                  "organization", "model", "experiment",
-                                                  "frequency", "modeling_realm",
-                                                  "variable_name", "ensemble", "time"}};
 
 /**
  * PublishAdapter handles the Publish usecases for the catalog
@@ -66,11 +61,13 @@
   /**
    * Constructor
    *
-   * @param face:      Face that will be used for NDN communications
-   * @param keyChain:  KeyChain that will be used for data signing
+   * @param face:       Face that will be used for NDN communications
+   * @param keyChain:   KeyChain that will be used for data signing
+   * @param syncSocket: ChronoSync socket
    */
   PublishAdapter(const std::shared_ptr<ndn::Face>& face,
-                 const std::shared_ptr<ndn::KeyChain>& keyChain);
+                 const std::shared_ptr<ndn::KeyChain>& keyChain,
+                 std::shared_ptr<chronosync::Socket>& syncSocket);
 
   virtual
   ~PublishAdapter();
@@ -234,10 +231,10 @@
   std::shared_ptr<DatabaseHandler> m_databaseHandler;
   std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
   RegisteredPrefixList m_registeredPrefixList;
-  std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+  std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
+  std::vector<std::string> m_tableColumns;
   // mutex to control critical sections
   std::mutex m_mutex;
-  std::vector<std::string> m_tableColumns;
   // TODO: create thread for each request, and the variables below should be within the thread
   bool m_mustBeFresh;
   bool m_isFinished;
@@ -247,8 +244,10 @@
 
 template <typename DatabaseHandler>
 PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
-                                                const std::shared_ptr<ndn::KeyChain>& keyChain)
+                                                const std::shared_ptr<ndn::KeyChain>& keyChain,
+                                                std::shared_ptr<chronosync::Socket>& syncSocket)
   : util::CatalogAdapter(face, keyChain)
+  , m_socket(syncSocket)
   , m_mustBeFresh(true)
   , m_isFinished(false)
   , m_catalogId("catalogIdPlaceHolder")
@@ -319,6 +318,13 @@
                                                const std::string& databaseTable)
 {
   m_nameFields = nameFields;
+
+  //initialize m_tableColumns
+  m_tableColumns = nameFields;
+  auto it = m_tableColumns.begin();
+  it = m_tableColumns.insert(it, std::string("name"));
+  it = m_tableColumns.insert(it, std::string("sha256"));
+
   m_databaseTable = databaseTable;
   config.addSectionHandler("publishAdapter",
                            bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
@@ -348,7 +354,7 @@
       signingId = item->second.get_value<std::string>();
       if (signingId.empty()) {
         throw Error("Invalid value for \"signingId\""
-                                " in \"publish\" section");
+                    " in \"publish\" section");
       }
     }
     else if (item->first == "security") {
@@ -364,33 +370,35 @@
            ++subItem) {
         if (subItem->first == "dbServer") {
           dbServer = subItem->second.get_value<std::string>();
-          if (dbServer.empty()){
-            throw Error("Invalid value for \"dbServer\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbName") {
           dbName = subItem->second.get_value<std::string>();
-          if (dbName.empty()){
-            throw Error("Invalid value for \"dbName\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbUser") {
           dbUser = subItem->second.get_value<std::string>();
-          if (dbUser.empty()){
-            throw Error("Invalid value for \"dbUser\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbPasswd") {
           dbPasswd = subItem->second.get_value<std::string>();
-          if (dbPasswd.empty()){
-            throw Error("Invalid value for \"dbPasswd\""
-                                    " in \"publish\" section");
-          }
         }
       }
+
+      // Items below must not be empty
+      if (dbServer.empty()){
+        throw Error("Invalid value for \"dbServer\""
+                    " in \"publish\" section");
+      }
+      if (dbName.empty()){
+        throw Error("Invalid value for \"dbName\""
+                    " in \"publish\" section");
+      }
+      if (dbUser.empty()){
+        throw Error("Invalid value for \"dbUser\""
+                    " in \"publish\" section");
+      }
+      if (dbPasswd.empty()){
+        throw Error("Invalid value for \"dbPasswd\""
+                    " in \"publish\" section");
+      }
     }
     else if (item->first == "sync") {
       const util::ConfigSection& synSection = item->second;
@@ -398,11 +406,10 @@
            subItem != synSection.end();
            ++subItem) {
         if (subItem->first == "prefix") {
-          syncPrefix.clear();
-          syncPrefix = subItem->second.get_value<std::string>();
+           syncPrefix = subItem->second.get_value<std::string>();
           if (syncPrefix.empty()){
             throw Error("Invalid value for \"prefix\""
-                                    " in \"publish\\sync\" section");
+                        " in \"publish\\sync\" section");
           }
         }
         // todo: parse the sync_security section
@@ -451,8 +458,10 @@
 
     MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
                       success, errMsg);
+#ifndef NDEBUG
     if (!success)
       _LOG_DEBUG(errMsg);
+#endif
 
     // create SQL string for table creation, id, sha256, and name are columns that we need
     std::stringstream ss;
@@ -468,8 +477,11 @@
 
     success = false;
     MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
+
+#ifndef NDEBUG
     if (!success)
       _LOG_DEBUG(errMsg);
+#endif
   }
   else {
     throw Error("cannot connect to the Database");
@@ -494,7 +506,7 @@
   m_keyChain->sign(*data);
   m_face->put(*data);
 
-  _LOG_DEBUG("ACK interest : " << interest.getName().toUri());
+  _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
 
 
   //TODO: if already in catalog, what do we do?
@@ -518,7 +530,7 @@
 void
 PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
 {
-  _LOG_DEBUG(interest.getName() << "timed out");
+  _LOG_ERROR(interest.getName() << "timed out");
 }
 
 template <typename DatabaseHandler>
@@ -526,7 +538,7 @@
 PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
                                                     const std::string& failureInfo)
 {
-  _LOG_DEBUG(data->getName() << " validation failed: " << failureInfo);
+  _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
 }
 
 template <typename DatabaseHandler>
@@ -535,7 +547,7 @@
                                                  const ndn::Data& data)
 {
   _LOG_DEBUG(">> PublishAdapter::onPublishedData");
-  _LOG_DEBUG("Data name: " << data.getName());
+  _LOG_DEBUG("Recv data : " << data.getName());
   if (data.getContent().empty()) {
     return;
   }
@@ -558,12 +570,10 @@
 
   // validate published data payload, if failed, return
   if (!validatePublicationChanges(data)) {
-    _LOG_DEBUG("Data validation failed : " << data->getName());
-#ifndef NDEBUG
+    _LOG_ERROR("Data validation failed : " << data->getName());
     const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
                               data->getContent().value_size());
     _LOG_DEBUG(payload);
-#endif
     return;
   }
 
@@ -695,7 +705,7 @@
   util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template <typename DatabaseHandler>
@@ -719,7 +729,7 @@
   util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template <typename DatabaseHandler>
@@ -727,7 +737,7 @@
 PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
 {
   // todo: record event, and use recovery Interest to fetch the whole table
-  _LOG_DEBUG("UpdateData retrieval timed out: " << interest.getName());
+  _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
 }
 
 template <typename DatabaseHandler>
@@ -792,7 +802,7 @@
   atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template<typename DatabaseHandler>
@@ -810,10 +820,10 @@
       return false;
 
     sqlString << "INSERT INTO " << m_databaseTable << " (";
-    for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
+    for (size_t i = 0; i < m_tableColumns.size(); ++i) {
       if (i != 0)
         sqlString << ", ";
-      sqlString << atmosTableColumns[i];
+      sqlString << m_tableColumns[i];
     }
     sqlString << ") VALUES";
 
@@ -823,7 +833,7 @@
       // cast might be overflowed
       Json::Value item = jsonValue["add"][static_cast<int>(i)];
       if (!item.isConvertibleTo(Json::stringValue)) {
-        _LOG_DEBUG("malformed JsonQuery string");
+        _LOG_ERROR("Malformed JsonQuery string");
         return false;
       }
       std::string fileName(item.asString());
@@ -853,7 +863,7 @@
       // cast might be overflowed
       Json::Value item = jsonValue["remove"][static_cast<int>(i)];
       if (!item.isConvertibleTo(Json::stringValue)) {
-        _LOG_DEBUG("Malformed JsonQuery");
+        _LOG_ERROR("Malformed JsonQuery");
         return false;
       }
       std::string fileName(item.asString());
@@ -890,15 +900,16 @@
   while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
     count++;
     token = fileName.substr(start, pos - start);
-    if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
-      return false; //fileName contains more than 10 fields
+    // exclude the sha256 and name (already processed)
+    if (count >= m_tableColumns.size() - 2) {
+      return false;
     }
     sqlString << ",'" << token << "'";
     start = pos + 1;
   }
 
-  // must be 10 fields in total (add the tail one)
-  if (count != atmosTableColumns.size() - 3  || std::string::npos == start)
+  // sha256 and name have been processed, and the last token will be processed later
+  if (count != m_tableColumns.size() - 3  || std::string::npos == start)
     return false;
   token = fileName.substr(start, std::string::npos - start);
   sqlString << ",'" << token << "'";
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index 435ad60..c0f292c 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -39,6 +39,7 @@
 #include <ndn-cxx/encoding/encoding-buffer.hpp>
 #include <ndn-cxx/util/in-memory-storage-lru.hpp>
 #include <ndn-cxx/util/string-helper.hpp>
+#include <ChronoSync/socket.hpp>
 
 #include "mysql/mysql.h"
 
@@ -72,11 +73,13 @@
   /**
    * Constructor
    *
-   * @param face:      Face that will be used for NDN communications
-   * @param keyChain:  KeyChain that will be used for data signing
+   * @param face:       Face that will be used for NDN communications
+   * @param keyChain:   KeyChain that will be used for data signing
+   * @param syncSocket: ChronoSync socket
    */
   QueryAdapter(const std::shared_ptr<ndn::Face>& face,
-               const std::shared_ptr<ndn::KeyChain>& keyChain);
+               const std::shared_ptr<ndn::KeyChain>& keyChain,
+               const std::shared_ptr<chronosync::Socket>& syncSocket);
 
   virtual
   ~QueryAdapter();
@@ -249,30 +252,38 @@
   getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
                       const ndn::Name::Component& version);
 
+  std::string
+  getChronoSyncDigest();
+
 protected:
   typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
   // Handle to the Catalog's database
   std::shared_ptr<DatabaseHandler> m_databaseHandler;
+  const std::shared_ptr<chronosync::Socket>& m_socket;
 
   // mutex to control critical sections
   std::mutex m_mutex;
   // @{ needs m_mutex protection
   // The Queries we are currently writing to
-  std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
-
+  //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
+  ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
   ndn::util::InMemoryStorageLru m_cache;
+  std::string m_chronosyncDigest;
   // @}
   RegisteredPrefixList m_registeredPrefixList;
-  //std::vector<std::string> m_atmosColumns;
   ndn::Name m_catalogId; // should be replaced with the PK digest
   std::vector<std::string> m_filterCategoryNames;
 };
 
 template <typename DatabaseHandler>
 QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
-                                            const std::shared_ptr<ndn::KeyChain>& keyChain)
+                                            const std::shared_ptr<ndn::KeyChain>& keyChain,
+                                            const std::shared_ptr<chronosync::Socket>& syncSocket)
   : util::CatalogAdapter(face, keyChain)
+  , m_socket(syncSocket)
+  , m_activeQueryToFirstResponse(100000)
   , m_cache(250000)
+  , m_chronosyncDigest("0")
   , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
 {
 }
@@ -345,7 +356,7 @@
       signingId = item->second.get_value<std::string>();
       if (signingId.empty()) {
         throw Error("Empty value for \"signingId\""
-                                " in \"query\" section");
+                    " in \"query\" section");
       }
     }
     if (item->first == "filterCategoryNames") {
@@ -363,37 +374,38 @@
       {
         if (subItem->first == "dbServer") {
           dbServer = subItem->second.get_value<std::string>();
-          if (dbServer.empty()){
-            throw Error("Invalid value for \"dbServer\""
-                                    " in \"query\" section");
-          }
         }
         if (subItem->first == "dbName") {
           dbName = subItem->second.get_value<std::string>();
-          if (dbName.empty()){
-            throw Error("Invalid value for \"dbName\""
-                                    " in \"query\" section");
-          }
         }
         if (subItem->first == "dbUser") {
           dbUser = subItem->second.get_value<std::string>();
-          if (dbUser.empty()){
-            throw Error("Invalid value for \"dbUser\""
-                                    " in \"query\" section");
-          }
         }
         if (subItem->first == "dbPasswd") {
           dbPasswd = subItem->second.get_value<std::string>();
-          if (dbPasswd.empty()){
-            throw Error("Invalid value for \"dbPasswd\""
-                                    " in \"query\" section");
-          }
         }
       }
+
+      if (dbServer.empty()){
+        throw Error("Invalid value for \"dbServer\""
+                    " in \"query\" section");
+      }
+      if (dbName.empty()){
+        throw Error("Invalid value for \"dbName\""
+                    " in \"query\" section");
+      }
+      if (dbUser.empty()){
+        throw Error("Invalid value for \"dbUser\""
+                    " in \"query\" section");
+      }
+      if (dbPasswd.empty()){
+        throw Error("Invalid value for \"dbPasswd\""
+                    " in \"query\" section");
+      }
     }
   }
 
-  if (m_filterCategoryNames.size() == 0) {
+  if (m_filterCategoryNames.empty()) {
     throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
   }
 
@@ -468,6 +480,7 @@
     // @todo: return a nack
     return;
   }
+
   std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
 
   _LOG_DEBUG("Interest : " << interestPtr->getName());
@@ -502,7 +515,7 @@
 template <typename DatabaseHandler>
 void
 QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
-                                                           const ndn::Interest& interest)
+                                                               const ndn::Interest& interest)
 {
   _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
   std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
@@ -560,9 +573,9 @@
       m_cache.insert(*filterData);
       try {
         m_face->put(*filterData);
-      }// catch exceptions and log
+      }
       catch (std::exception& e) {
-        _LOG_DEBUG(e.what());
+        _LOG_ERROR(e.what());
       }
       m_mutex.unlock();
 
@@ -612,7 +625,7 @@
       = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
                                        util::QUERY, success, errMsg);
     if (!success) {
-      _LOG_DEBUG(errMsg);
+      _LOG_ERROR(errMsg);
       value.clear();
       return;
     }
@@ -670,9 +683,9 @@
   ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
 
   signData(*ack);
-#ifndef NDEBUG
-  _LOG_DEBUG(queryResultNameStr);
-#endif
+
+  _LOG_DEBUG("Make ACK : " << queryResultNameStr);
+
   return ack;
 }
 
@@ -688,9 +701,9 @@
   nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
 
   signData(*nack);
-#ifndef NDEBUG
-  _LOG_DEBUG(ndn::Name(dataPrefix).appendSegment(segmentNo));
-#endif
+
+  _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));
+
   m_mutex.lock();
   m_cache.insert(*nack);
   m_mutex.unlock();
@@ -703,9 +716,9 @@
                                         Json::Value& jsonValue)
 {
   _LOG_DEBUG(">> QueryAdapter::json2Sql");
-#ifndef NDEBUG
+
   _LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
   if (jsonValue.type() != Json::objectValue) {
     return false;
   }
@@ -718,13 +731,13 @@
     Json::Value value = (*iter);
 
     if (key == Json::nullValue || value == Json::nullValue) {
-      _LOG_DEBUG("null key or value in JsonValue");
+      _LOG_ERROR("Null key or value in JsonValue");
       return false;
     }
 
     // cannot convert to string
     if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
-      _LOG_DEBUG("malformed JsonQuery string");
+      _LOG_ERROR("Malformed JsonQuery string");
       return false;
     }
 
@@ -756,9 +769,9 @@
                                                       bool& lastComponent)
 {
   _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
-#ifndef NDEBUG
+
   _LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
   if (jsonValue.type() != Json::objectValue) {
     return false;
   }
@@ -771,13 +784,13 @@
     Json::Value value = (*iter);
 
     if (key == Json::nullValue || value == Json::nullValue) {
-      _LOG_DEBUG("null key or value in JsonValue");
+      _LOG_ERROR("Null key or value in JsonValue");
       return false;
     }
 
     // cannot convert to string
     if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
-      _LOG_DEBUG("malformed JsonQuery string");
+      _LOG_ERROR("Malformed JsonQuery string");
       return false;
     }
 
@@ -838,9 +851,9 @@
                                                          Json::Value& jsonValue)
 {
   _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
-#ifndef NDEBUG
+
   _LOG_DEBUG(jsonValue.toStyledString());
-#endif
+
   if (jsonValue.type() != Json::objectValue) {
     return false;
   }
@@ -853,13 +866,13 @@
     Json::Value value = (*iter);
 
     if (key == Json::nullValue || value == Json::nullValue) {
-      _LOG_DEBUG("null key or value in JsonValue");
+      _LOG_ERROR("Null key or value in JsonValue");
       return false;
     }
 
     // cannot convert to string
     if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
-      _LOG_DEBUG("malformed JsonQuery string");
+      _LOG_ERROR("Malformed JsonQuery string");
       return false;
     }
 
@@ -934,54 +947,70 @@
     // no JSON query, send Nack?
     return;
   }
-  // check if the ACK is cached, if yes, respond with ACK
-  // ?? what if the results for now it NULL, but latter exist?
-  // For efficiency, do a double check. Once without the lock, then with it.
-  if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
-    m_mutex.lock();
-    { // !!! BEGIN CRITICAL SECTION !!!
-      // If this fails upon locking, we removed it during our search.
-      // An unusual race-condition case, which requires things like PIT aggregation to be off.
-      auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
-      if (iter != m_activeQueryToFirstResponse.end()) {
-        m_face->put(*(iter->second));
-        m_mutex.unlock(); //escape lock
-        return;
-      }
-    } // !!!  END  CRITICAL SECTION !!!
-    m_mutex.unlock();
+
+  // the version should be replaced with ChronoSync state digest
+  ndn::name::Component version;
+
+  if(m_socket != nullptr) {
+    const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
+    std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
+    _LOG_DEBUG("Original digest" << m_chronosyncDigest);
+    _LOG_DEBUG("New digest : " << digestStr);
+    // if the m_chronosyncDigest and the rootdigest are not equal
+    if (digestStr != m_chronosyncDigest) {
+      // (1) update chronosyncDigest
+      // (2) clear all staled ACK data
+      m_mutex.lock();
+      m_chronosyncDigest = digestStr;
+      m_activeQueryToFirstResponse.erase(ndn::Name("/"));
+      m_mutex.unlock();
+      _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
+    }
+    version = ndn::name::Component::fromEscapedString(digestStr);
   }
+  else {
+    version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
+  }
+
+  // try to respond with the inMemoryStorage
+  m_mutex.lock();
+  { // !!! BEGIN CRITICAL SECTION !!!
+    auto data = m_activeQueryToFirstResponse.find(interest->getName());
+    if (data) {
+      _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
+      m_face->put(*data);
+      m_mutex.unlock();
+      return;
+    }
+  } // !!! END CRITICAL SECTION !!!
+  m_mutex.unlock();
 
   // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
   Json::Value parsedFromString;
   Json::Reader reader;
   if (!reader.parse(jsonQuery, parsedFromString)) {
     // @todo: send NACK?
-    _LOG_DEBUG("cannot parse the JsonQuery");
+    _LOG_ERROR("Cannot parse the JsonQuery");
     return;
   }
 
-  // the version should be replaced with ChronoSync state digest
-  const ndn::name::Component version
-    = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
-                                          ndn::time::system_clock::now()).count());
-
   std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
 
   m_mutex.lock();
   { // !!! BEGIN CRITICAL SECTION !!!
     // An unusual race-condition case, which requires things like PIT aggregation to be off.
-    auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
-    if (iter != m_activeQueryToFirstResponse.end()) {
-      m_face->put(*(iter->second));
-      m_mutex.unlock(); // escape lock
+    auto data = m_activeQueryToFirstResponse.find(interest->getName());
+    if (data) {
+      m_face->put(*data);
+      m_mutex.unlock();
       return;
     }
+
     // This is where things are expensive so we save them for the lock
     // note that we ack the query with the cached ACK messages, but we should remove the ACKs
     // that conatin the old version when ChronoSync is updated
-    //m_activeQueryToFirstResponse.insert(std::pair<std::string,
-    //                                    std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+    m_activeQueryToFirstResponse.insert(*ack);
+
     m_face->put(*ack);
   } // !!!  END  CRITICAL SECTION !!!
   m_mutex.unlock();
@@ -1038,19 +1067,19 @@
                                      bool lastComponent)
 {
   _LOG_DEBUG(">> QueryAdapter::prepareSegments");
-#ifndef NDEBUG
+
   _LOG_DEBUG(sqlString);
-#endif
+
   std::string errMsg;
   bool success;
   // 4) Run the Query
   std::shared_ptr<MYSQL_RES> results
     = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 
   if (!results) {
-    _LOG_DEBUG("NULL MYSQL_RES for" << sqlString);
+    _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
 
     // @todo: throw runtime error or log the error message?
     return;
@@ -1058,9 +1087,7 @@
 
   uint64_t resultCount = mysql_num_rows(results.get());
 
-#ifndef NDEBUG
   _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
-#endif
 
   MYSQL_ROW row;
   uint64_t segmentNo = 0;
@@ -1119,11 +1146,9 @@
   if (lastComponent)
     entry["lastComponent"] = Json::Value(true);
 
-#ifndef NDEBUG
   _LOG_DEBUG("resultCount " << resultCount << ";"
              << "viewStart " << viewStart << ";"
              << "viewEnd " << viewEnd);
-#endif
 
   if (isAutocomplete) {
     entry["next"] = value;
@@ -1143,13 +1168,14 @@
   if (isFinalBlock) {
     data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
   }
-#ifndef NDEBUG
+
   _LOG_DEBUG(segmentName);
-#endif
+
   signData(*data);
   return data;
 }
 
+
 } // namespace query
 } // namespace atmos
 #endif //ATMOS_QUERY_QUERY_ADAPTER_HPP
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 7c04c83..04b101d 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -56,8 +56,8 @@
 
   /**
    * Constructor
-   * @param face:      Face that will be used for NDN communications
-   * @param keyChain:  KeyChain that will be used for data signing
+   * @param face:       Face that will be used for NDN communications
+   * @param keyChain:   KeyChain that will be used for data signing
    */
   CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
                  const std::shared_ptr<ndn::KeyChain>& keyChain);
diff --git a/catalog/src/util/logger.hpp b/catalog/src/util/logger.hpp
index 3e2d8dd..f7c66f0 100644
--- a/catalog/src/util/logger.hpp
+++ b/catalog/src/util/logger.hpp
@@ -34,6 +34,9 @@
 #define INIT_LOGGER(name) \
   static log4cxx::LoggerPtr staticModuleLogger = log4cxx::Logger::getLogger(name)
 
+#define _LOG_INFO(x) \
+  LOG4CXX_INFO(staticModuleLogger, x)
+
 #define _LOG_DEBUG(x) \
   LOG4CXX_DEBUG(staticModuleLogger, x)
 
@@ -49,6 +52,9 @@
 #define _LOG_ERROR(x) \
   LOG4CXX_ERROR(staticModuleLogger, x)
 
+#define _LOG_FATAL(x) \
+  LOG4CXX_FATAL(staticModuleLogger, x)
+
 #else // HAVE_LOG4CXX
 
 #define INIT_LOGGER(name)
@@ -56,7 +62,8 @@
 #define _LOG_FUNCTION_NOARGS
 #define _LOG_TRACE(x)
 #define INIT_LOGGERS(x)
-#define _LOG_ERROR(x)
+#define _LOG_ERROR(x) \
+  std::cerr << x << std::endl
 
 #ifdef _DEBUG
 
@@ -65,13 +72,18 @@
 #include <ndn-cxx/util/time.hpp>
 
 // to print out messages, must be in debug mode
+
 #define _LOG_DEBUG(x) \
-  std::clog << ndn::time::system_clock::now() << " " << std::this_thread::get_id() << \
-               " " << x << std::endl
+  std::clog << ndn::time::system_clock::now() << " " << x << std::endl
+
+#define _LOG_FATAL(x) \
+  std::clog << ndn::time::system_clock::now() << " " << x << std::endl
+
 
 #else // _DEBUG
 
 #define _LOG_DEBUG(x)
+#define _LOG_FATAL(x)
 
 #endif // _DEBUG
 
diff --git a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
index 8ff0486..99d7e90 100644
--- a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
+++ b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
@@ -35,8 +35,9 @@
   {
   public:
     PublishAdapterTest(const std::shared_ptr<ndn::util::DummyClientFace>& face,
-                       const std::shared_ptr<ndn::KeyChain>& keyChain)
-      : publish::PublishAdapter<std::string>(face, keyChain)
+                       const std::shared_ptr<ndn::KeyChain>& keyChain,
+                       std::shared_ptr<chronosync::Socket>& syncSocket)
+      : publish::PublishAdapter<std::string>(face, keyChain, syncSocket)
     {
     }
 
@@ -111,8 +112,8 @@
       : face(makeDummyClientFace(io))
       , keyChain(new ndn::KeyChain())
       , databaseTable("cmip5")
-      , publishAdapterTest1(face, keyChain)
-      , publishAdapterTest2(face, keyChain)
+      , publishAdapterTest1(face, keyChain, syncSocket)
+      , publishAdapterTest2(face, keyChain, syncSocket)
     {
       std::string cx("sha256"), c0("name"), c1("activity"), c2("product"), c3("organization");
       std::string c4("model"), c5("experiment"), c6("frequency"), c7("modeling_realm");
@@ -197,6 +198,7 @@
   protected:
     std::shared_ptr<DummyClientFace> face;
     std::shared_ptr<ndn::KeyChain> keyChain;
+    std::shared_ptr<chronosync::Socket> syncSocket;
     std::vector<std::string> tableFields;
     std::string databaseTable;
     PublishAdapterTest publishAdapterTest1;
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index 8b210b7..c9252d3 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -35,8 +35,9 @@
   {
   public:
     QueryAdapterTest(const std::shared_ptr<ndn::util::DummyClientFace>& face,
-                     const std::shared_ptr<ndn::KeyChain>& keyChain)
-      : query::QueryAdapter<std::string>(face, keyChain)
+                     const std::shared_ptr<ndn::KeyChain>& keyChain,
+                     const std::shared_ptr<chronosync::Socket>& syncSocket)
+      : query::QueryAdapter<std::string>(face, keyChain, syncSocket)
     {
     }
 
@@ -131,21 +132,6 @@
     }
 
     std::shared_ptr<const ndn::Data>
-    getDataFromActiveQuery(const std::string& jsonQuery)
-    {
-      m_mutex.lock();
-      if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
-        auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
-        if (iter != m_activeQueryToFirstResponse.end()) {
-          m_mutex.unlock();
-          return iter->second;
-        }
-      }
-      m_mutex.unlock();
-      return std::shared_ptr<const ndn::Data>();
-    }
-
-    std::shared_ptr<const ndn::Data>
     getDataFromCache(const ndn::Interest& interest)
     {
       return m_cache.find(interest);
@@ -181,8 +167,8 @@
       : face(makeDummyClientFace(io))
       , keyChain(new ndn::KeyChain())
       , databaseTable("cmip5")
-      , queryAdapterTest1(face, keyChain)
-      , queryAdapterTest2(face, keyChain)
+      , queryAdapterTest1(face, keyChain, syncSocket)
+      , queryAdapterTest2(face, keyChain, syncSocket)
     {
       std::string c1("activity"), c2("product"), c3("organization"), c4("model");
       std::string c5("experiment"), c6("frequency"), c7("modeling_realm"), c8("variable_name");
@@ -261,6 +247,7 @@
   protected:
     std::shared_ptr<DummyClientFace> face;
     std::shared_ptr<ndn::KeyChain> keyChain;
+    std::shared_ptr<chronosync::Socket> syncSocket;
     std::string databaseTable;
     std::vector<std::string> nameFields;
     QueryAdapterTest queryAdapterTest1;
diff --git a/log4cxx.properties b/log4cxx.properties
index 3144df3..cdf2386 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -1,22 +1,21 @@
-# Set root logger level to DEBUG and its only appender to R.
-log4j.rootLogger=ERROR, R
+# Please refer to https://logging.apache.org/log4cxx/usage.html for option details
 
-log4j.appender.R=org.apache.log4j.RollingFileAppender
-log4j.appender.R.File=example.log
-log4j.appender.R.MaxFileSize=100KB
+log4j.rootLogger=INFO, rootConsoleAppender, rootFileAppender
 
-log4j.appender.R.layout=org.apache.log4j.PatternLayout
-log4j.appender.R.layout.ConversionPattern=%d{hh:mm:ss,SSS}  %-14t %-14c  %m%n
+# rootConsoleAppender prints out messages to console
+log4j.appender.rootConsoleAppender=org.apache.log4j.ConsoleAppender
+log4j.appender.rootConsoleAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.rootConsoleAppender.layout.ConversionPattern=%d %-5p [%c] - %m%n
 
-# R is set to be a ConsoleAppender.
-#log4j.appender.R=org.apache.log4j.ConsoleAppender
+# rootFileAppender dumps messages to a file
+#log4j.appender.rootFileAppender=org.apache.log4j.FileAppender
+log4j.appender.rootFileAppender=org.apache.log4j.RollingFileAppender
+log4j.appender.rootFileAppender.layout=org.apache.log4j.PatternLayout
+log4j.appender.rootFileAppender.layout.ConversionPattern=%d %-5p [%c] - %m%n
+log4j.appender.rootFileAppender.File=catalog.log
+log4j.appender.rootFileAppender.MaxFileSize=100KB
 
-# R uses PatternLayout.
-#log4j.appender.R.layout=org.apache.log4j.PatternLayout
-#log4j.appender.R.target=System.err
-#log4j.appender.R.layout.ConversionPattern=%d{dd-MMM HH:MM:SS,SSS} %p %c %m%n
-#log4j.appender.R.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c  %m%n
-#log4j.appender.R.layout.ConversionPattern=%d{ss,SSS}  %-12c  %m%n
+# Change the log level (INFO, DEBUG, etc.) here
+log4j.logger.QueryAdapter=INFO
+log4j.logger.PublishAdapter=INFO
 
-log4j.logger.QueryAdapter = DEBUG
-log4j.logger.PublishAdapter = DEBUG