catalog: Implement Sync Update Executor

Change-Id: I88a0c6935d7b8094dfb587e83270a607e753ea31
refs: #2611
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index 9fb03db..b954bc8 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -34,14 +34,23 @@
 #include <ndn-cxx/security/key-chain.hpp>
 #include <ndn-cxx/security/validator-config.hpp>
 
+#include <ChronoSync/socket.hpp>
 #include <memory>
 #include <string>
 #include <vector>
 #include <unordered_map>
+#include <mutex>
 
 namespace atmos {
 namespace publish {
 
+#define RETRY_WHEN_TIMEOUT 2
+// TODO: need to use the configured nameFields
+std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
+                                                  "organization", "model", "experiment",
+                                                  "frequency", "modeling_realm",
+                                                  "variable_name", "ensemble", "time"}};
+
 /**
  * PublishAdapter handles the Publish usecases for the catalog
  */
@@ -88,6 +97,9 @@
   virtual void
   onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
 
+  virtual void
+  onTimeout(const ndn::Interest& interest);
+
   /**
    * Data containing the actual thing we need to publish
    *
@@ -98,10 +110,10 @@
   onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
 
   /**
-   * Helper function to set the DatabaseHandler
+   * Helper function to initialize the DatabaseHandler
    */
   void
-  setDatabaseHandler(const util::ConnectionDetails&  databaseId);
+  initializeDatabase(const util::ConnectionDetails&  databaseId);
 
   /**
    * Helper function that sets filters to make the adapter work
@@ -119,6 +131,88 @@
   bool
   validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
 
+
+  /**
+   * Helper function that processes the sync update
+   *
+   * @param updates: vector that contains all the missing data information
+   */
+    void
+    processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
+
+  /**
+   * Helper function that processes the update data
+   *
+   * @param data: shared pointer for the fetched update data
+   */
+    void
+    processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
+
+  /**
+   * Helper function that add data to or remove data from database
+   *
+   * @param sql: sql string to do the add or remove jobs
+   * @param op:  enum value indicates the database operation, could be REMOVE, ADD
+   */
+    virtual void
+    operateDatabase(const std::string& sql,
+                    util::DatabaseOperation op);
+
+  /**
+   * Helper function that parses jsonValue to generate sql string, return value indicates
+   * if it is successfully
+   *
+   * @param sqlString: streamstream to save the sql string
+   * @param jsonValue: Json value that contains the update information
+   * @param op:        enum value indicates the database operation, could be REMOVE, ADD
+   */
+    bool
+    json2Sql(std::stringstream& sqlString,
+             Json::Value& jsonValue,
+             util::DatabaseOperation op);
+
+  /**
+   * Helper function to generate sql string based on file name, return value indicates
+   * if it is successfully
+   *
+   * @param sqlString: streamstream to save the sql string
+   * @param fileName:  ndn uri string for a file name
+   */
+    bool
+    name2Fields(std::stringstream& sqlstring,
+                std::string& fileName);
+
+  /**
+   * Check the local database for the latest sequence number for a ChronoSync update
+   *
+   * @param update: the MissingDataInfo object
+   */
+  chronosync::SeqNo
+  getLatestSeqNo(const chronosync::MissingDataInfo& update);
+
+  /**
+   * Update the local database with the update message
+   *
+   * @param update: the MissingDataInfo object
+   */
+    void
+    renewUpdateInformation(const chronosync::MissingDataInfo& update);
+
+  /**
+   * Insert the update message into the local database
+   *
+   * @param update: the MissingDataInfo object
+   */
+    void
+    addUpdateInformation(const chronosync::MissingDataInfo& update);
+
+    void
+    onFetchUpdateDataTimeout(const ndn::Interest& interest);
+
+    void
+    onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+                             const std::string& failureInfo);
+
 protected:
   typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
   // Prefix for ChronoSync
@@ -127,6 +221,13 @@
   std::shared_ptr<DatabaseHandler> m_databaseHandler;
   std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
   RegisteredPrefixList m_registeredPrefixList;
+  std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+  // mutex to control critical sections
+  std::mutex m_mutex;
+  std::vector<std::string> m_tableColumns;
+  // TODO: create thread for each request, and the variables below should be within the thread
+  bool m_mustBeFresh;
+  bool m_isFinished;
 };
 
 
@@ -134,6 +235,8 @@
 PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
                                                 const std::shared_ptr<ndn::KeyChain>& keyChain)
   : util::CatalogAdapter(face, keyChain)
+  , m_mustBeFresh(true)
+  , m_isFinished(false)
 {
 }
 
@@ -142,14 +245,21 @@
 PublishAdapter<DatabaseHandler>::setFilters()
 {
   ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
-  m_registeredPrefixList[publishPrefix] =
-    m_face->setInterestFilter(publishPrefix,
-                              bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
-                                   this, _1, _2),
-                              bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
-                                   this, _1),
-                              bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
-                                   this, _1, _2));
+    m_registeredPrefixList[publishPrefix] =
+      m_face->setInterestFilter(publishPrefix,
+                                bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
+                                     this, _1, _2),
+                                bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+                                     this, _1),
+                                bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+                                     this, _1, _2));
+
+    ndn::Name catalogSync = ndn::Name(m_prefix).append("sync");
+    m_socket.reset(new chronosync::Socket(m_syncPrefix,
+                                          catalogSync,
+                                          *m_face,
+                                          bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
+                                               this, _1)));
 }
 
 template <typename DatabaseHandler>
@@ -264,24 +374,69 @@
   m_syncPrefix.append(syncPrefix);
   util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
 
-  setDatabaseHandler(mysqlId);
+  initializeDatabase(mysqlId);
   setFilters();
 }
 
 template <typename DatabaseHandler>
 void
-PublishAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
 {
   //empty
 }
 
 template <>
 void
-PublishAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
 {
   std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
 
   m_databaseHandler = conn;
+
+  if (m_databaseHandler != nullptr) {
+    std::string errMsg;
+    bool success = false;
+    // Ignore errors (when database already exists, errors are expected)
+    std::string createSyncTable =
+      "CREATE TABLE `chronosync_update_info` (\
+       `id` int(11) NOT NULL AUTO_INCREMENT,  \
+       `session_name` varchar(1000) NOT NULL, \
+       `seq_num` int(11) NOT NULL,            \
+       PRIMARY KEY (`id`),                    \
+       UNIQUE KEY `id_UNIQUE` (`id`)            \
+       ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
+
+    MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE, success, errMsg);
+    if (!success)
+      std::cout << errMsg << std::endl;
+
+    std::string createCmip5Table =
+      "CREATE TABLE `cmip5` (                   \
+       `id` int(100) NOT NULL AUTO_INCREMENT,   \
+       `sha256` varchar(64) NOT NULL,           \
+       `name` varchar(1000) NOT NULL,           \
+       `activity` varchar(100) NOT NULL,        \
+       `product` varchar(100) NOT NULL,         \
+       `organization` varchar(100) NOT NULL,    \
+       `model` varchar(100) NOT NULL,           \
+       `experiment` varchar(100) NOT NULL,      \
+       `frequency` varchar(100) NOT NULL,       \
+       `modeling_realm` varchar(100) NOT NULL,  \
+       `variable_name` varchar(100) NOT NULL,   \
+       `ensemble` varchar(100) NOT NULL,        \
+       `time` varchar(100) NOT NULL,            \
+       PRIMARY KEY (`id`),                      \
+       UNIQUE KEY `sha256` (`sha256`)           \
+       ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
+
+    success = false;
+    MySQLPerformQuery(m_databaseHandler, createCmip5Table, util::CREATE, success, errMsg);
+    if (!success)
+      std::cout << errMsg << std::endl;
+  }
+  else {
+    throw Error("cannot connect to the Database");
+  }
 }
 
 template <typename DatabaseHandler>
@@ -289,7 +444,39 @@
 PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
                                                    const ndn::Interest& interest)
 {
-  // @todo: Request the data for publish
+  // Example Interest : /cmip5/publish/<uri>/<nonce>
+  std::cout << "Publish interest : " << interest.getName().toUri() << std::endl;
+
+  //send back ACK
+  char buf[4] = "ACK";
+  std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
+  data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
+  data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
+  m_keyChain->sign(*data);
+  m_face->put(*data);
+  std::cout << "Sent ACK for " << interest.getName() << std::endl;
+
+
+  //TODO: if already in catalog, what do we do?
+  //ask for content
+  ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
+  size_t m_nextSegment = 0;
+  std::shared_ptr<ndn::Interest> retrieveInterest =
+    std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
+  retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
+  retrieveInterest->setMustBeFresh(m_mustBeFresh);
+  m_face->expressInterest(*retrieveInterest,
+                          bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
+                               this,_1, _2),
+                          bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
+  std::cout << "Expressing Interest for: " << retrieveInterest->toUri() << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
+{
+  std::cout << "interest " << interest.getName() << " timed out";
 }
 
 template <typename DatabaseHandler>
@@ -297,7 +484,363 @@
 PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
                                                  const ndn::Data& data)
 {
-  // @todo handle data publication
+  std::cout << "received Data " << data.getName() << std::endl;
+  if (data.getContent().empty()) {
+    return;
+  }
+
+  std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
+  // validate published data payload, if failed, return
+  if (!validatePublicationChanges(dataPtr)) {
+    std::cout << "data validation failed : " << dataPtr->getName() << std::endl;
+#ifndef NDEBUG
+    const std::string payload(reinterpret_cast<const char*>(dataPtr->getContent().value()),
+                              dataPtr->getContent().value_size());
+    std::cout << payload << std::endl;
+#endif
+    return;
+  }
+
+  // todo: return value to indicate if the insertion succeeds
+  processUpdateData(dataPtr);
+
+  // ideally, data should not be stale?
+  m_socket->publishData(data.getContent(), ndn::time::seconds(3600));
+
+  // if this is not the final block, continue to fetch the next one
+  const ndn::name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
+  if (finalBlockId == data.getName()[-1]) {
+    m_isFinished = true;
+  }
+  //else, get the next segment
+  if (!m_isFinished) {
+    ndn::Name nextInterestName = data.getName().getPrefix(-1);
+    uint64_t incomingSegment = data.getName()[-1].toSegment();
+    std::cout << " Next Interest Name " << nextInterestName << " Segment " << incomingSegment++
+              << std::endl;
+    std::shared_ptr<ndn::Interest> nextInterest =
+      std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment++));
+    nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
+    nextInterest->setMustBeFresh(m_mustBeFresh);
+    m_face->expressInterest(*nextInterest,
+                            bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
+                                 this,_1, _2),
+                            bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
+                                 this, _1));
+  }
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::processUpdateData(const ndn::shared_ptr<const ndn::Data>& data)
+{
+  const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
+                            data->getContent().value_size());
+
+  if (payload.length() <= 0) {
+    return;
+  }
+
+  // the data payload must be JSON format
+  //    http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
+  Json::Value parsedFromPayload;
+  Json::Reader jsonReader;
+  if (!jsonReader.parse(payload, parsedFromPayload)) {
+    // todo: logging events
+    std::cout << "fail to parse the update data" << std::endl;
+    return;
+  }
+  else {
+    std::cout << "received Json format payload : "
+              << parsedFromPayload.toStyledString() << std::endl;
+  }
+  std::stringstream ss;
+  if (json2Sql(ss, parsedFromPayload, util::ADD)) {
+    std::cout << "sql string to insert data : " << ss.str() << std::endl;
+    // todo: before use, check if the connection is not NULL
+    // we may need to use lock here to ensure thread safe
+    operateDatabase(ss.str(), util::ADD);
+  }
+
+  ss.str("");
+  ss.clear();
+  if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
+    std::cout << "sql string to remove data: " << ss.str() << std::endl;
+    operateDatabase(ss.str(), util::REMOVE);
+  }
+}
+
+template <typename DatabaseHandler>
+chronosync::SeqNo
+PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
+{
+  // empty
+  return 0;
+}
+
+template <>
+chronosync::SeqNo
+PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
+{
+  std::string sql = "select seq_num from chronosync_update_info where session_name = '"
+    + update.session.toUri() + "';";
+  std::cout << "get latest seqNo : " << sql << std::endl;
+  std::string errMsg;
+  bool success;
+  std::shared_ptr<MYSQL_RES> results
+    = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
+  if (!success) {
+    std::cout << errMsg << std::endl;
+    return 0; //database connection error?
+  }
+  else if (results != nullptr){
+    MYSQL_ROW row;
+    if (mysql_num_rows(results.get()) == 0)
+      return 0;
+
+    while ((row = mysql_fetch_row(results.get())))
+    {
+      chronosync::SeqNo seqNo = std::stoull(row[0]);
+      return seqNo;
+    }
+  }
+  return 0;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+  //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+  std::string sql = "update chronosync_update_info set seq_num = "
+    + boost::lexical_cast<std::string>(update.high)
+    + " where session_name = '" + update.session.toUri() + "';";
+  std::cout << "renew update Info : " << sql << std::endl;
+  std::string errMsg;
+  bool success = false;
+  m_mutex.lock();
+  util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
+  m_mutex.unlock();
+  if (!success)
+    std::cout << errMsg << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+  //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+  std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
+    + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
+    + ");";
+
+  std::cout << "add update Info : " << sql << std::endl;
+  std::string errMsg;
+  bool success = false;
+  m_mutex.lock();
+  util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
+  m_mutex.unlock();
+  if (!success)
+    std::cout << errMsg << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
+{
+  // todo: record event, and use recovery Interest to fetch the whole table
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+                                                          const std::string& failureInfo)
+{
+  std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
+                                                   updates)
+{
+  if (updates.empty()) {
+    return;
+  }
+
+  // multiple updates from different catalog are possible
+  for (size_t i = 0; i < updates.size(); ++ i) {
+    // check if the session is in local DB
+    // if yes, only fetch packets whose seq number is bigger than the one in the DB
+    // if no, directly fetch Data
+    chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
+    bool update = false;
+
+    for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++ seq) {
+      if (seq > localSeqNo) {
+        m_socket->fetchData(updates[i].session, seq,
+                            bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
+                            bind(&PublishAdapter<DatabaseHandler>::onUpdateValidationFailed,
+                                 this, _1, _2),
+                            bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
+                                 this, _1),
+                            RETRY_WHEN_TIMEOUT);
+        std::cout << "send Interest for [" << updates[i].session << ":" << seq << "]" << std::endl;
+        update = true;
+      }
+    }
+    // update the seq session name and seq number in local DB
+    // indicating they are processed. So latter when this node reboots again, won't redo it
+    if (update) {
+      if (localSeqNo > 0)
+        renewUpdateInformation(updates[i]);
+      else
+        addUpdateInformation(updates[i]);
+    }
+  }
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
+{
+  // empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
+{
+  std::string errMsg;
+  bool success = false;
+  m_mutex.lock();
+  atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
+  m_mutex.unlock();
+  if (!success)
+    std::cout << errMsg << std::endl;
+}
+
+template<typename DatabaseHandler>
+bool
+PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
+                                          Json::Value& jsonValue,
+                                          util::DatabaseOperation op)
+{
+  if (jsonValue.type() != Json::objectValue) {
+    std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+    return false;
+  }
+  if (op == util::ADD) {
+    size_t updateNumber = jsonValue["add"].size();
+    if (updateNumber <= 0)
+      return false;
+
+    sqlString << "INSERT INTO cmip5 (";
+    for (size_t i = 0; i < atmosTableColumns.size(); ++ i) {
+      if (i != 0)
+        sqlString << ", ";
+      sqlString << atmosTableColumns[i];
+    }
+    sqlString << ") VALUES";
+
+    for (size_t i = 0; i < updateNumber; ++ i) { //parse each file name
+      if (i > 0)
+        sqlString << ",";
+      // cast might be overflowed
+      Json::Value item = jsonValue["add"][static_cast<int>(i)];
+      if (!item.isConvertibleTo(Json::stringValue)) {
+        std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
+        return false;
+      }
+      std::string fileName(item.asString());
+      // use digest sha256 for now, may be removed
+      ndn::util::Digest<CryptoPP::SHA256> digest;
+      digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
+
+      sqlString << "('" << digest.toString() << "','" << fileName << "'";
+
+      // parse the ndn name to get each value for each field
+      if (!name2Fields(sqlString, fileName))
+        return false;
+      sqlString << ")";
+    }
+    sqlString << ";";
+  }
+  else if (op == util::REMOVE) {
+    // remove files from db
+    size_t updateNumber = jsonValue["remove"].size();
+    if (updateNumber <= 0)
+      return false;
+
+    sqlString << "delete from cmip5 where name in (";
+    for (size_t i = 0; i < updateNumber; ++ i) {
+      if (i > 0)
+        sqlString << ",";
+      // cast might be overflowed
+      Json::Value item = jsonValue["remove"][static_cast<int>(i)];
+      if (!item.isConvertibleTo(Json::stringValue)) {
+        std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
+        return false;
+      }
+      std::string fileName(item.asString());
+
+      sqlString << "'" << fileName << "'";
+    }
+    sqlString << ");";
+  }
+  return true;
+}
+
+template<typename DatabaseHandler>
+bool
+PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
+                                             std::string& fileName)
+{
+  size_t start = 0;
+  size_t pos = 0;
+  size_t count = 0;
+  std::string token;
+  std::string delimiter = "/";
+  // fileName must starts with either ndn:/ or /
+  std::string nameWithNdn("ndn:/");
+  std::string nameWithSlash("/");
+  if (fileName.find(nameWithNdn) == 0) {
+    start = nameWithNdn.size();
+  }
+  else if (fileName.find(nameWithSlash) == 0) {
+    start = nameWithSlash.size();
+  }
+  else
+    return false;
+
+  while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
+    count ++;
+    token = fileName.substr(start, pos - start);
+    if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
+      return false; //fileName contains more than 10 fields
+    }
+    sqlString << ",'" << token << "'";
+    start = pos + 1;
+  }
+
+  // must be 10 fields in total (add the tail one)
+  if (count != atmosTableColumns.size() - 3  || std::string::npos == start)
+    return false;
+  token = fileName.substr(start, std::string::npos - start);
+  sqlString << ",'" << token << "'";
+  return true;
 }
 
 template<typename DatabaseHandler>
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index 2adb161..ccfefb6 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -198,6 +198,10 @@
   json2AutocompletionSql(std::stringstream& sqlQuery,
                          Json::Value& jsonValue);
 
+  bool
+  json2CompleteSearchSql(std::stringstream& sqlQuery,
+                 Json::Value& jsonValue);
+
 protected:
   typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
   // Handle to the Catalog's database
@@ -439,9 +443,9 @@
   nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
 
   signData(*nack);
-
+#ifndef NDEBUG
   std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
-
+#endif
   m_mutex.lock();
   m_cache.insert(*nack);
   m_mutex.unlock();
@@ -563,7 +567,86 @@
   // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
   // return true
   bool more = false;
-  sqlQuery << "SELECT " << m_nameFields[count] << " FROM cmip5";
+  sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
+  for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
+       it != typedComponents.end(); ++it) {
+    if (more)
+      sqlQuery << " AND";
+    else
+      sqlQuery << " WHERE";
+
+    sqlQuery << " " << it->first << "='" << it->second << "'";
+
+    more = true;
+  }
+  sqlQuery << ";";
+  return true;
+}
+
+template <typename DatabaseHandler>
+bool
+QueryAdapter<DatabaseHandler>::json2CompleteSearchSql(std::stringstream& sqlQuery,
+                                              Json::Value& jsonValue)
+{
+#ifndef NDEBUG
+  std::cout << "jsonValue in json2CompleteSearchSql: " << jsonValue.toStyledString() << std::endl;
+#endif
+  if (jsonValue.type() != Json::objectValue) {
+    std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+    return false;
+  }
+
+  std::string typedString;
+  // get the string in the jsonValue
+  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+  {
+    Json::Value key = iter.key();
+    Json::Value value = (*iter);
+
+    if (key == Json::nullValue || value == Json::nullValue) {
+      std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
+      return false;
+    }
+
+    // cannot convert to string
+    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
+      std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
+      return false;
+    }
+
+    if (key.asString().compare("??") == 0) {
+      typedString.assign(value.asString());
+      // since the front end triggers the autocompletion when users typed '/',
+      // there must be a '/' at the end, and the first char must be '/'
+      if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+        return false;
+      break;
+    }
+  }
+
+  // 1. get the expected column number by parsing the typedString, so we can get the filed name
+  size_t pos = 0;
+  size_t start = 1; // start from the 1st char which is not '/'
+  size_t count = 0; // also the name to query for
+  std::string token;
+  std::string delimiter = "/";
+  std::map<std::string, std::string> typedComponents;
+  while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
+    token = typedString.substr(start, pos - start);
+    if (count >= m_nameFields.size() - 1) {
+      return false;
+    }
+
+    // add column name and value (token) into map
+    typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
+    count ++;
+    start = pos + 1;
+  }
+
+  // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
+  // return true
+  bool more = false;
+  sqlQuery << "SELECT name FROM cmip5";
   for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
        it != typedComponents.end(); ++it) {
     if (more)
@@ -668,6 +751,12 @@
       return;
     }
   }
+  else if (parsedFromString.get("??", tmp) != tmp) {
+    if (!json2CompleteSearchSql(sqlQuery, parsedFromString)) {
+      sendNack(segmentPrefix);
+      return;
+    }
+  }
   else {
     if (!json2Sql(sqlQuery, parsedFromString)) {
       sendNack(segmentPrefix);
@@ -695,11 +784,16 @@
                                      const std::string& sqlString,
                                      bool autocomplete)
 {
+#ifndef NDEBUG
   std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
-
+#endif
+  std::string errMsg;
+  bool success;
   // 4) Run the Query
   std::shared_ptr<MYSQL_RES> results
-    = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
+    = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
+  if (!success)
+    std::cout << errMsg << std::endl;
 
   if (!results) {
     std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
@@ -717,27 +811,29 @@
             << " rows" << std::endl;
 
   MYSQL_ROW row;
-  size_t usedBytes = 0;
   uint64_t segmentNo = 0;
-  Json::Value array;
+  Json::Value tmp;
+  Json::Value resultJson;
+  Json::FastWriter fastWriter;
   while ((row = mysql_fetch_row(results.get())))
   {
-    size_t size = strlen(row[0]) + 1;
-    if (usedBytes + size > PAYLOAD_LIMIT) {
+    tmp.append(row[0]);
+    const std::string tmpString = fastWriter.write(tmp);
+    if (tmpString.length() > PAYLOAD_LIMIT) {
       std::shared_ptr<ndn::Data> data
-        = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete, resultCount);
+        = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
       m_mutex.lock();
       m_cache.insert(*data);
       m_mutex.unlock();
-      array.clear();
-      usedBytes = 0;
-      segmentNo++;
+      tmp.clear();
+      resultJson.clear();
+      segmentNo ++;
     }
-    array.append(row[0]);
-    usedBytes += size;
+    resultJson.append(row[0]);
   }
+
   std::shared_ptr<ndn::Data> data
-    = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete, resultCount);
+    = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
   m_mutex.lock();
   m_cache.insert(*data);
   m_mutex.unlock();
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
index fb58665..9df8b8e 100644
--- a/catalog/src/util/mysql-util.cpp
+++ b/catalog/src/util/mysql-util.cpp
@@ -19,6 +19,7 @@
 #include "util/mysql-util.hpp"
 #include <mysql/errmsg.h>
 #include <stdexcept>
+#include <iostream>
 
 namespace atmos {
 namespace util {
@@ -32,10 +33,13 @@
 
 
 std::shared_ptr<MYSQL>
-MySQLConnectionSetup(const ConnectionDetails& details) {
+MySQLConnectionSetup(const ConnectionDetails& details)
+{
   MYSQL* conn = mysql_init(NULL);
+  my_bool reconnect = 1;
+  mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect);
   if(!mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
-                        details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
+                         details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
     throw std::runtime_error(mysql_error(conn));
   }
   std::shared_ptr<MYSQL> connection(conn, &mysql_close);
@@ -43,25 +47,37 @@
 }
 
 std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
+                  const std::string& sql_query,
+                  DatabaseOperation op,
+                  bool& success,
+                  std::string& errMsg)
+{
   switch (mysql_query(connection.get(), sql_query.c_str()))
   {
-    case 0:
-    {
+  case 0:
+  {
+    success = true;
+    if (op == QUERY) {
       MYSQL_RES* resultPtr = mysql_store_result(connection.get());
       if (resultPtr != NULL)
       {
         return std::shared_ptr<MYSQL_RES>(resultPtr, &mysql_free_result);
       }
-      break;
     }
-    // Various error cases
-    case CR_COMMANDS_OUT_OF_SYNC:
-    case CR_SERVER_GONE_ERROR:
-    case CR_SERVER_LOST:
-    case CR_UNKNOWN_ERROR:
-    default:
-      break;
+    //for add, remove, we don't need the results, may be log the events
+    break;
+  }
+  // Various error cases
+  case CR_COMMANDS_OUT_OF_SYNC:
+  case CR_SERVER_GONE_ERROR:
+  case CR_SERVER_LOST:
+  case CR_UNKNOWN_ERROR:
+  default:
+  {
+    errMsg.assign(mysql_error(connection.get()));
+    break;
+  }
   }
   return nullptr;
 }
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
index ee45f97..75e5997 100644
--- a/catalog/src/util/mysql-util.hpp
+++ b/catalog/src/util/mysql-util.hpp
@@ -26,6 +26,7 @@
 
 namespace atmos {
 namespace util {
+enum DatabaseOperation {CREATE, UPDATE, ADD, REMOVE, QUERY};
 struct ConnectionDetails {
 public:
   std::string server;
@@ -41,7 +42,11 @@
 MySQLConnectionSetup(const ConnectionDetails& details);
 
 std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
+                  const std::string& sql_query,
+                  DatabaseOperation op,
+                  bool& success,
+                  std::string& errMsg);
 
 } // namespace util
 } // namespace atmos
diff --git a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
index 7d6a894..90c6f3f 100644
--- a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
+++ b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
@@ -45,6 +45,11 @@
     {
     }
 
+    void setTableFields(const std::vector<std::string>& tableFields)
+    {
+      m_tableColumns = tableFields;
+    }
+
     const ndn::Name
     getPrefix()
     {
@@ -71,6 +76,21 @@
     }
 
     bool
+    testJson2Sql(std::stringstream& sqlString,
+                 Json::Value& jsonValue,
+                 util::DatabaseOperation operation)
+    {
+      return json2Sql(sqlString, jsonValue, operation);
+    }
+
+    bool
+    testName2Fields(std::stringstream& sqlString,
+                    std::string& fileName)
+    {
+      return name2Fields(sqlString, fileName);
+    }
+
+    bool
     testValidatePublicationChanges(const std::shared_ptr<const ndn::Data>& data)
     {
       return validatePublicationChanges(data);
@@ -86,6 +106,21 @@
       , publishAdapterTest1(face, keyChain)
       , publishAdapterTest2(face, keyChain)
     {
+      std::string cx("sha256"), c0("name"), c1("activity"), c2("product"), c3("organization");
+      std::string c4("model"), c5("experiment"), c6("frequency"), c7("modeling_realm");
+      std::string c8("variable_name"), c9("ensemble"), c10("time");
+      tableFields.push_back(cx);
+      tableFields.push_back(c0);
+      tableFields.push_back(c1);
+      tableFields.push_back(c2);
+      tableFields.push_back(c3);
+      tableFields.push_back(c4);
+      tableFields.push_back(c5);
+      tableFields.push_back(c6);
+      tableFields.push_back(c7);
+      tableFields.push_back(c8);
+      tableFields.push_back(c9);
+      tableFields.push_back(c10);
     }
 
     virtual
@@ -116,6 +151,7 @@
       catch (boost::property_tree::info_parser_error &e) {
         std::cout << "Failed to read config file " << e.what() << std::endl;
       }
+      publishAdapterTest1.setTableFields(tableFields);
       publishAdapterTest1.configAdapter(section, ndn::Name("/test"));
     }
 
@@ -142,6 +178,7 @@
       catch (boost::property_tree::info_parser_error &e) {
         std::cout << "Failed to read config file " << e.what() << std::endl;;
       }
+      publishAdapterTest2.setTableFields(tableFields);
       publishAdapterTest2.configAdapter(section, ndn::Name("/test"));
     }
 
@@ -150,6 +187,7 @@
     std::shared_ptr<ndn::KeyChain> keyChain;
     PublishAdapterTest publishAdapterTest1;
     PublishAdapterTest publishAdapterTest2;
+    std::vector<std::string> tableFields;
   };
 
   BOOST_FIXTURE_TEST_SUITE(PublishAdapterTestSuite, PublishAdapterFixture)
@@ -175,6 +213,76 @@
                 ndn::Name("ndn:/ndn-atmos/broadcast/chronosync"));
   }
 
+  BOOST_AUTO_TEST_CASE(PublishAdapterName2FieldsNormalTest)
+  {
+    std::string testFileName1 = "/1/2/3/4/5/6/7/8/9/10";
+    std::stringstream ss;
+    std::string expectString1 = ",'1','2','3','4','5','6','7','8','9','10'";
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName1), true);
+    BOOST_CHECK_EQUAL(ss.str(), expectString1);
+
+    ss.str("");
+    ss.clear();
+    std::string testFileName2 = "ndn:/1/2/3/4/5/6/777/8/99999/10";
+    std::string expectString2 = ",'1','2','3','4','5','6','777','8','99999','10'";
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName2), true);
+    BOOST_CHECK_EQUAL(ss.str(), expectString2);
+  }
+
+  BOOST_AUTO_TEST_CASE(PublishAdapterName2FieldsFailureTest)
+  {
+    std::string testFileName1 = "/1/2/3/4/5/6/7/8/9/10/11";//too much components
+    std::stringstream ss;
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName1), false);
+
+    ss.str("");
+    ss.clear();
+    std::string testFileName2 = "1234567890";
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName2), false);
+
+    ss.str("");
+    ss.clear();
+    std::string testFileName3 = "ndn:/1/2/3/4/5"; //too little components
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName3), false);
+  }
+
+  BOOST_AUTO_TEST_CASE(PublishAdapterSqlStringNormalTest)
+  {
+    Json::Value testJson;
+    testJson["add"][0] = "/1/2/3/4/5/6/7/8/9/10";
+    testJson["add"][1] = "ndn:/a/b/c/d/eee/f/gg/h/iiii/j";
+    testJson["remove"][0] = "ndn:/1/2/3/4/5/6/7/8/9/10";
+    testJson["remove"][1] = "/a/b/c/d";
+    testJson["remove"][2] = "/test/for/remove";
+
+    std::stringstream ss;
+    std::string expectRes1 = "INSERT INTO cmip5 (sha256, name, activity, product, organization, \
+model, experiment, frequency, modeling_realm, variable_name, ensemble, time) VALUES(\
+'3738C9C0E0297DE7FE0EE538030597442DEEFF0F2C88778404D7B6E4BAD589F6','/1/2/3/4/5/6/7/8/9/10',\
+'1','2','3','4','5','6','7','8','9','10'),\
+('F93128EE9B7769105C6BDF6AA0FAA8CB4ED429395DDBC2CDDBFBA05F35B320FB','ndn:/a/b/c/d/eee/f/gg/h/iiii/j'\
+,'a','b','c','d','eee','f','gg','h','iiii','j');";
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testJson2Sql(ss, testJson, util::ADD), true);
+    BOOST_CHECK_EQUAL(ss.str(), expectRes1);
+
+    ss.str("");
+    ss.clear();
+    std::string expectRes2 = "delete from cmip5 where name in ('ndn:/1/2/3/4/5/6/7/8/9/10',\
+'/a/b/c/d','/test/for/remove');";
+    BOOST_CHECK_EQUAL(publishAdapterTest1.testJson2Sql(ss, testJson, util::REMOVE), true);
+    BOOST_CHECK_EQUAL(ss.str(), expectRes2);
+  }
+
+  BOOST_AUTO_TEST_CASE(PublishAdapterSqlStringFailureTest)
+  {
+    Json::Value testJson;
+    testJson["add"][0] = "/1/2/3/4/5/6/7/8/9/10";
+    testJson["add"][1] = "/a/b/c/d/eee/f/gg/h/iiii/j/kkk"; //too much components
+    std::stringstream ss;
+    bool res = publishAdapterTest1.testJson2Sql(ss, testJson, util::REMOVE);
+    BOOST_CHECK(res == false);
+  }
+
   BOOST_AUTO_TEST_CASE(PublishAdapterValidateDataTestSuccess)
   {
     ndn::Name dataName("/test/publisher/12345"); // data name must be prefix+nonce
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index 3be4544..fdf5140 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -397,7 +397,8 @@
                                                                      1, false, false, 2);
     BOOST_CHECK_EQUAL(data->getName().toUri(), "/atmos/test/prefix/%00%01");
     BOOST_CHECK_EQUAL(data->getFinalBlockId(), ndn::Name::Component(""));
-    const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()));
+    const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()),
+                              data->getContent().value_size());
     Json::Value parsedFromString;
     Json::Reader reader;
     BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -418,7 +419,8 @@
 
     BOOST_CHECK_EQUAL(data->getName().toUri(), "/atmos/test/prefix/%00%02");
     BOOST_CHECK_EQUAL(data->getFinalBlockId(), ndn::Name::Component::fromSegment(2));
-    const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()));
+    const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()),
+                              data->getContent().value_size());
     Json::Value parsedFromString;
     Json::Reader reader;
     BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -456,7 +458,8 @@
     BOOST_CHECK(replyData);
     if (replyData){
       BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query-results"));
-      const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()));
+      const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()),
+                                replyData->getContent().value_size());
       Json::Value parsedFromString;
       Json::Reader reader;
       BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -476,22 +479,23 @@
     Json::Value testJson;
     testJson["?"] = "/";
     BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT activity FROM cmip5;", ss.str());
+    BOOST_CHECK_EQUAL("SELECT DISTINCT activity FROM cmip5;", ss.str());
 
     ss.str("");
     ss.clear();
     testJson.clear();
     testJson["?"] = "/Activity/";
     BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT product FROM cmip5 WHERE activity='Activity';", ss.str());
+    BOOST_CHECK_EQUAL("SELECT DISTINCT product FROM cmip5 WHERE activity='Activity';", ss.str());
 
     ss.str("");
     ss.clear();
     testJson.clear();
     testJson["?"] = "/Activity/Product/Organization/Model/Experiment/";
     BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT frequency FROM cmip5 WHERE activity='Activity' AND experiment=\
-'Experiment' AND model='Model' AND organization='Organization' AND product='Product';", ss.str());
+    BOOST_CHECK_EQUAL("SELECT DISTINCT frequency FROM cmip5 WHERE activity='Activity' AND \
+experiment='Experiment' AND model='Model' AND organization='Organization' AND product='Product';",
+     ss.str());
 
     ss.str("");
     ss.clear();
@@ -499,9 +503,10 @@
     testJson["?"] = "/Activity/Product/Organization/Model/Experiment/Frequency/Modeling/\
 Variable/Ensemble/";
     BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT time FROM cmip5 WHERE activity='Activity' AND ensemble='Ensemble' AND\
- experiment='Experiment' AND frequency='Frequency' AND model='Model' AND modeling_realm='Modeling' \
-AND organization='Organization' AND product='Product' AND variable_name='Variable';",ss.str());
+    BOOST_CHECK_EQUAL("SELECT DISTINCT time FROM cmip5 WHERE activity='Activity' AND ensemble=\
+'Ensemble' AND experiment='Experiment' AND frequency='Frequency' AND model='Model' AND \
+modeling_realm='Modeling' AND organization='Organization' AND product='Product' AND variable_name=\
+'Variable';",ss.str());
   }
 
   BOOST_AUTO_TEST_CASE(QueryAdapterAutocompletionSqlFailTest)