base catalog version on ChronoSync

* remove the redundant TableColumns in publishAdapter
* share chonosync::Socket in adapters to allow queryAdapter to use digest as version
* add various levels of log messages

refs: #2770

Change-Id: Id5aca0f84d9b7963fc63bf0329fbc6394f2b5777
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index aa61845..b7b8355 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -51,11 +51,6 @@
 #endif
 
 #define RETRY_WHEN_TIMEOUT 2
-// TODO: need to use the configured nameFields
-std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
-                                                  "organization", "model", "experiment",
-                                                  "frequency", "modeling_realm",
-                                                  "variable_name", "ensemble", "time"}};
 
 /**
  * PublishAdapter handles the Publish usecases for the catalog
@@ -66,11 +61,13 @@
   /**
    * Constructor
    *
-   * @param face:      Face that will be used for NDN communications
-   * @param keyChain:  KeyChain that will be used for data signing
+   * @param face:       Face that will be used for NDN communications
+   * @param keyChain:   KeyChain that will be used for data signing
+   * @param syncSocket: ChronoSync socket
    */
   PublishAdapter(const std::shared_ptr<ndn::Face>& face,
-                 const std::shared_ptr<ndn::KeyChain>& keyChain);
+                 const std::shared_ptr<ndn::KeyChain>& keyChain,
+                 std::shared_ptr<chronosync::Socket>& syncSocket);
 
   virtual
   ~PublishAdapter();
@@ -234,10 +231,10 @@
   std::shared_ptr<DatabaseHandler> m_databaseHandler;
   std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
   RegisteredPrefixList m_registeredPrefixList;
-  std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+  std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
+  std::vector<std::string> m_tableColumns;
   // mutex to control critical sections
   std::mutex m_mutex;
-  std::vector<std::string> m_tableColumns;
   // TODO: create thread for each request, and the variables below should be within the thread
   bool m_mustBeFresh;
   bool m_isFinished;
@@ -247,8 +244,10 @@
 
 template <typename DatabaseHandler>
 PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
-                                                const std::shared_ptr<ndn::KeyChain>& keyChain)
+                                                const std::shared_ptr<ndn::KeyChain>& keyChain,
+                                                std::shared_ptr<chronosync::Socket>& syncSocket)
   : util::CatalogAdapter(face, keyChain)
+  , m_socket(syncSocket)
   , m_mustBeFresh(true)
   , m_isFinished(false)
   , m_catalogId("catalogIdPlaceHolder")
@@ -319,6 +318,13 @@
                                                const std::string& databaseTable)
 {
   m_nameFields = nameFields;
+
+  //initialize m_tableColumns
+  m_tableColumns = nameFields;
+  auto it = m_tableColumns.begin();
+  it = m_tableColumns.insert(it, std::string("name"));
+  it = m_tableColumns.insert(it, std::string("sha256"));
+
   m_databaseTable = databaseTable;
   config.addSectionHandler("publishAdapter",
                            bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
@@ -348,7 +354,7 @@
       signingId = item->second.get_value<std::string>();
       if (signingId.empty()) {
         throw Error("Invalid value for \"signingId\""
-                                " in \"publish\" section");
+                    " in \"publish\" section");
       }
     }
     else if (item->first == "security") {
@@ -364,33 +370,35 @@
            ++subItem) {
         if (subItem->first == "dbServer") {
           dbServer = subItem->second.get_value<std::string>();
-          if (dbServer.empty()){
-            throw Error("Invalid value for \"dbServer\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbName") {
           dbName = subItem->second.get_value<std::string>();
-          if (dbName.empty()){
-            throw Error("Invalid value for \"dbName\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbUser") {
           dbUser = subItem->second.get_value<std::string>();
-          if (dbUser.empty()){
-            throw Error("Invalid value for \"dbUser\""
-                                    " in \"publish\" section");
-          }
         }
         if (subItem->first == "dbPasswd") {
           dbPasswd = subItem->second.get_value<std::string>();
-          if (dbPasswd.empty()){
-            throw Error("Invalid value for \"dbPasswd\""
-                                    " in \"publish\" section");
-          }
         }
       }
+
+      // Items below must not be empty
+      if (dbServer.empty()){
+        throw Error("Invalid value for \"dbServer\""
+                    " in \"publish\" section");
+      }
+      if (dbName.empty()){
+        throw Error("Invalid value for \"dbName\""
+                    " in \"publish\" section");
+      }
+      if (dbUser.empty()){
+        throw Error("Invalid value for \"dbUser\""
+                    " in \"publish\" section");
+      }
+      if (dbPasswd.empty()){
+        throw Error("Invalid value for \"dbPasswd\""
+                    " in \"publish\" section");
+      }
     }
     else if (item->first == "sync") {
       const util::ConfigSection& synSection = item->second;
@@ -398,11 +406,10 @@
            subItem != synSection.end();
            ++subItem) {
         if (subItem->first == "prefix") {
-          syncPrefix.clear();
-          syncPrefix = subItem->second.get_value<std::string>();
+           syncPrefix = subItem->second.get_value<std::string>();
           if (syncPrefix.empty()){
             throw Error("Invalid value for \"prefix\""
-                                    " in \"publish\\sync\" section");
+                        " in \"publish\\sync\" section");
           }
         }
         // todo: parse the sync_security section
@@ -451,8 +458,10 @@
 
     MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
                       success, errMsg);
+#ifndef NDEBUG
     if (!success)
       _LOG_DEBUG(errMsg);
+#endif
 
     // create SQL string for table creation, id, sha256, and name are columns that we need
     std::stringstream ss;
@@ -468,8 +477,11 @@
 
     success = false;
     MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
+
+#ifndef NDEBUG
     if (!success)
       _LOG_DEBUG(errMsg);
+#endif
   }
   else {
     throw Error("cannot connect to the Database");
@@ -494,7 +506,7 @@
   m_keyChain->sign(*data);
   m_face->put(*data);
 
-  _LOG_DEBUG("ACK interest : " << interest.getName().toUri());
+  _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
 
 
   //TODO: if already in catalog, what do we do?
@@ -518,7 +530,7 @@
 void
 PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
 {
-  _LOG_DEBUG(interest.getName() << "timed out");
+  _LOG_ERROR(interest.getName() << "timed out");
 }
 
 template <typename DatabaseHandler>
@@ -526,7 +538,7 @@
 PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
                                                     const std::string& failureInfo)
 {
-  _LOG_DEBUG(data->getName() << " validation failed: " << failureInfo);
+  _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
 }
 
 template <typename DatabaseHandler>
@@ -535,7 +547,7 @@
                                                  const ndn::Data& data)
 {
   _LOG_DEBUG(">> PublishAdapter::onPublishedData");
-  _LOG_DEBUG("Data name: " << data.getName());
+  _LOG_DEBUG("Recv data : " << data.getName());
   if (data.getContent().empty()) {
     return;
   }
@@ -558,12 +570,10 @@
 
   // validate published data payload, if failed, return
   if (!validatePublicationChanges(data)) {
-    _LOG_DEBUG("Data validation failed : " << data->getName());
-#ifndef NDEBUG
+    _LOG_ERROR("Data validation failed : " << data->getName());
     const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
                               data->getContent().value_size());
     _LOG_DEBUG(payload);
-#endif
     return;
   }
 
@@ -695,7 +705,7 @@
   util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template <typename DatabaseHandler>
@@ -719,7 +729,7 @@
   util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template <typename DatabaseHandler>
@@ -727,7 +737,7 @@
 PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
 {
   // todo: record event, and use recovery Interest to fetch the whole table
-  _LOG_DEBUG("UpdateData retrieval timed out: " << interest.getName());
+  _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
 }
 
 template <typename DatabaseHandler>
@@ -792,7 +802,7 @@
   atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
   m_mutex.unlock();
   if (!success)
-    _LOG_DEBUG(errMsg);
+    _LOG_ERROR(errMsg);
 }
 
 template<typename DatabaseHandler>
@@ -810,10 +820,10 @@
       return false;
 
     sqlString << "INSERT INTO " << m_databaseTable << " (";
-    for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
+    for (size_t i = 0; i < m_tableColumns.size(); ++i) {
       if (i != 0)
         sqlString << ", ";
-      sqlString << atmosTableColumns[i];
+      sqlString << m_tableColumns[i];
     }
     sqlString << ") VALUES";
 
@@ -823,7 +833,7 @@
       // cast might be overflowed
       Json::Value item = jsonValue["add"][static_cast<int>(i)];
       if (!item.isConvertibleTo(Json::stringValue)) {
-        _LOG_DEBUG("malformed JsonQuery string");
+        _LOG_ERROR("Malformed JsonQuery string");
         return false;
       }
       std::string fileName(item.asString());
@@ -853,7 +863,7 @@
       // cast might be overflowed
       Json::Value item = jsonValue["remove"][static_cast<int>(i)];
       if (!item.isConvertibleTo(Json::stringValue)) {
-        _LOG_DEBUG("Malformed JsonQuery");
+        _LOG_ERROR("Malformed JsonQuery");
         return false;
       }
       std::string fileName(item.asString());
@@ -890,15 +900,16 @@
   while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
     count++;
     token = fileName.substr(start, pos - start);
-    if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
-      return false; //fileName contains more than 10 fields
+    // exclude the sha256 and name (already processed)
+    if (count >= m_tableColumns.size() - 2) {
+      return false;
     }
     sqlString << ",'" << token << "'";
     start = pos + 1;
   }
 
-  // must be 10 fields in total (add the tail one)
-  if (count != atmosTableColumns.size() - 3  || std::string::npos == start)
+  // sha256 and name have been processed, and the last token will be processed later
+  if (count != m_tableColumns.size() - 3  || std::string::npos == start)
     return false;
   token = fileName.substr(start, std::string::npos - start);
   sqlString << ",'" << token << "'";