catalog: add connection pool for the database

Change-Id: Ie014a5b11081908ea1cfdf30451ef44757d37c7e
diff --git a/.jenkins.d/00-deps.sh b/.jenkins.d/00-deps.sh
index 24b497d..9cd88f4 100755
--- a/.jenkins.d/00-deps.sh
+++ b/.jenkins.d/00-deps.sh
@@ -8,19 +8,34 @@
     set -x
     brew update
     brew upgrade
-    brew install boost sqlite3 pkg-config mysql jsoncpp
+    brew install boost sqlite3 pkg-config mysql jsoncpp libzdb
     brew cleanup
 fi
 
 if has Ubuntu $NODE_LABELS; then
     BOOST_PKG=libboost-all-dev
+    ZDB_PKG=libzdb-dev
     if has Ubuntu-12.04 $NODE_LABELS; then
         BOOST_PKG=libboost1.48-all-dev
+        unset ZDB_PKG
+        sudo apt-get update -qq -y
+        sudo apt-get -qq -y install autoconf libtool re2c
+        pushd /tmp >/dev/null
+        sudo rm -Rf libzdb
+        git clone https://bitbucket.org/tildeslash/libzdb.git
+        pushd libzdb >/dev/null
+        git checkout -b 3-1 release-3-1
+        ./bootstrap
+        ./configure
+        make
+        sudo make install
+        popd >/dev/null
+        popd >/dev/null
     fi
 
     set -x
     sudo apt-get update -qq -y
     sudo apt-get -qq -y install build-essential pkg-config $BOOST_PKG libssl-dev \
                                 libcrypto++-dev libsqlite3-dev libmysqlclient-dev \
-                                libjsoncpp-dev protobuf-compiler libprotobuf-dev
+                                libjsoncpp-dev protobuf-compiler libprotobuf-dev $ZDB_PKG
 fi
diff --git a/README.md b/README.md
index 5758a8d..2c87492 100644
--- a/README.md
+++ b/README.md
@@ -21,6 +21,7 @@
  * mysql 5.6.23 (http://www.mysql.com/)
  * ndn-cxx (https://github.com/named-data/ndn-cxx.git)
  * ChronoSync (https://github.com/named-data/ChronoSync.git)
+ * libzdb (http://www.tildeslash.com/libzdb/)
 
 **Dependency for tools and translator library**
 
@@ -33,7 +34,7 @@
  *  For OSX, the prerequisites can be installed using Homebrew:
 
 <pre>
-    brew install boost sqlite3 mysql jsoncpp hdf5 openssl cryptopp protobuf
+    brew install boost sqlite3 mysql jsoncpp hdf5 openssl cryptopp protobuf libzdb
     pip3 install mysql-connector-python --allow-all-external
     pip3 install netCDF4
 
@@ -46,7 +47,7 @@
                         libsqlite3-dev libmysqlclient-dev libjsoncpp-dev \
                         protobuf-compiler libprotobuf-dev netcdf4-python \
                         python3-mysql.connector python3-pip libhdf5-dev \
-                        libnetcdf-dev python3-numpy
+                        libnetcdf-dev python3-numpy libzdb-dev
 
     sudo pip3 install netCDF4
 </pre>
@@ -56,7 +57,7 @@
 <pre>
     sudo yum install boost-devel openssl-devel cryptopp-devel sqlite3x-devel \
                     mysql-devel jsoncpp-devel protobuf-compiler protobuf-devel \
-                    netcdf4-python3 mysql-connector-python3
+                    netcdf4-python3 mysql-connector-python3 libzdb-devel
 </pre>
 
 
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 4bf1142..5525f24 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -78,9 +78,9 @@
   std::shared_ptr<chronosync::Socket> syncSocket;
 
   std::unique_ptr<atmos::util::CatalogAdapter>
-    queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain, syncSocket));
+    queryAdapter(new atmos::query::QueryAdapter<ConnectionPool_T>(face, keyChain, syncSocket));
   std::unique_ptr<atmos::util::CatalogAdapter>
-    publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain, syncSocket));
+    publishAdapter(new atmos::publish::PublishAdapter<ConnectionPool_T>(face, keyChain, syncSocket));
 
   atmos::catalog::Catalog catalogInstance(face, keyChain, configFile);
   catalogInstance.addAdapter(publishAdapter);
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index b7b8355..a839645 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -119,6 +119,9 @@
   void
   initializeDatabase(const util::ConnectionDetails&  databaseId);
 
+  void
+  closeDatabaseHandler();
+
   /**
    * Helper function that sets filters to make the adapter work
    */
@@ -138,7 +141,6 @@
   bool
   validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
 
-
   /**
    * Helper function that processes the sync update
    *
@@ -263,7 +265,7 @@
 
 template <>
 void
-PublishAdapter<MYSQL>::setCatalogId()
+PublishAdapter<ConnectionPool_T>::setCatalogId()
 {
   // use public key digest as the catalog ID
   ndn::Name keyId;
@@ -302,12 +304,27 @@
 }
 
 template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::closeDatabaseHandler()
+{
+}
+
+template <>
+void
+PublishAdapter<ConnectionPool_T>::closeDatabaseHandler()
+{
+  ConnectionPool_stop(*m_databaseHandler);
+}
+
+template <typename DatabaseHandler>
 PublishAdapter<DatabaseHandler>::~PublishAdapter()
 {
   for (const auto& itr : m_registeredPrefixList) {
     if (static_cast<bool>(itr.second))
       m_face->unsetInterestFilter(itr.second);
   }
+
+  closeDatabaseHandler();
 }
 
 template <typename DatabaseHandler>
@@ -437,15 +454,14 @@
 
 template <>
 void
-PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
+PublishAdapter<ConnectionPool_T>::initializeDatabase(const util::ConnectionDetails& databaseId)
 {
-  std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+  m_databaseHandler = zdbConnectionSetup(databaseId);
 
-  m_databaseHandler = conn;
+  Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
 
-  if (m_databaseHandler != nullptr) {
-    std::string errMsg;
-    bool success = false;
+  if (conn != NULL) {
+
     // Ignore errors (when database already exists, errors are expected)
     std::string createSyncTable =
       "CREATE TABLE `chronosync_update_info` (\
@@ -456,12 +472,15 @@
        UNIQUE KEY `id_UNIQUE` (`id`)          \
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
 
-    MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
-                      success, errMsg);
-#ifndef NDEBUG
-    if (!success)
-      _LOG_DEBUG(errMsg);
-#endif
+    // must use libzdb's try-catch style
+    TRY {
+      Connection_execute(conn,
+                         reinterpret_cast<const char*>(createSyncTable.c_str()), createSyncTable.size());
+    }
+    CATCH(SQLException) {
+      _LOG_ERROR(Connection_getLastError(conn));
+    }
+    END_TRY;
 
     // create SQL string for table creation, id, sha256, and name are columns that we need
     std::stringstream ss;
@@ -475,13 +494,17 @@
     ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
 
-    success = false;
-    MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
+    // must use libzdb's try-catch style
+    TRY {
+      Connection_execute(conn,
+                         reinterpret_cast<const char*>(ss.str().c_str()), ss.str().size());
+    }
+    CATCH(SQLException) {
+      _LOG_ERROR(Connection_getLastError(conn));
+    }
+    END_TRY;
 
-#ifndef NDEBUG
-    if (!success)
-      _LOG_DEBUG(errMsg);
-#endif
+    Connection_close(conn);
   }
   else {
     throw Error("cannot connect to the Database");
@@ -655,32 +678,36 @@
 
 template <>
 chronosync::SeqNo
-PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
+PublishAdapter<ConnectionPool_T>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
 {
   _LOG_DEBUG(">> PublishAdapter::getLatestSeqNo");
 
-  std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
-    + update.session.toUri() + "';";
+  Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
 
-  std::string errMsg;
-  bool success;
-  std::shared_ptr<MYSQL_RES> results
-    = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
-  if (!success) {
-    _LOG_DEBUG(errMsg);
-    return 0; //database connection error?
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
+    return 0;
   }
-  else if (results != nullptr){
-    MYSQL_ROW row;
-    if (mysql_num_rows(results.get()) == 0)
-      return 0;
 
-    while ((row = mysql_fetch_row(results.get())))
-    {
-      chronosync::SeqNo seqNo = std::stoull(row[0]);
-      return seqNo;
-    }
+  PreparedStatement_T ps4SeqNum =
+    Connection_prepareStatement(conn,
+                                "SELECT seq_num FROM chronosync_update_info WHERE session_name = ?");
+  PreparedStatement_setString(ps4SeqNum, 1, update.session.toUri().c_str());
+  ResultSet_T res4SeqNum;
+  TRY {
+     res4SeqNum = PreparedStatement_executeQuery(ps4SeqNum);
   }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  while (ResultSet_next(res4SeqNum)) {
+    return ResultSet_getInt(res4SeqNum, 1);
+  }
+
+  Connection_close(conn);
+
   return 0;
 }
 
@@ -693,19 +720,31 @@
 
 template <>
 void
-PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
+PublishAdapter<ConnectionPool_T>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
 {
-  std::string sql = "UPDATE chronosync_update_info SET seq_num = "
-    + boost::lexical_cast<std::string>(update.high)
-    + " WHERE session_name = '" + update.session.toUri() + "';";
+  Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
 
-  std::string errMsg;
-  bool success = false;
-  m_mutex.lock();
-  util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
-  m_mutex.unlock();
-  if (!success)
-    _LOG_ERROR(errMsg);
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
+    return;
+  }
+
+  PreparedStatement_T ps4UpdateSeqNum =
+    Connection_prepareStatement(conn,
+                                "UPDATE chronosync_update_info SET seq_num = ? WHERE session_name = ?");
+  PreparedStatement_setLLong(ps4UpdateSeqNum, 1, update.high);
+  PreparedStatement_setString(ps4UpdateSeqNum, 1, update.session.toUri().c_str());
+
+  TRY {
+     PreparedStatement_execute(ps4UpdateSeqNum);
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  Connection_close(conn);
+
 }
 
 template <typename DatabaseHandler>
@@ -717,19 +756,31 @@
 
 template <>
 void
-PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
+PublishAdapter<ConnectionPool_T>::addUpdateInformation(const chronosync::MissingDataInfo& update)
 {
-  std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
-    + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
-    + ");";
+  Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
 
-  std::string errMsg;
-  bool success = false;
-  m_mutex.lock();
-  util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
-  m_mutex.unlock();
-  if (!success)
-    _LOG_ERROR(errMsg);
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
+    return;
+  }
+
+  PreparedStatement_T ps4UpdateChronosync =
+    Connection_prepareStatement(conn, "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES (?, ?)");
+
+  PreparedStatement_setString(ps4UpdateChronosync, 1, update.session.toUri().c_str());
+  PreparedStatement_setLLong(ps4UpdateChronosync, 1, update.high);
+
+  TRY {
+     PreparedStatement_execute(ps4UpdateChronosync);
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  Connection_close(conn);
+
 }
 
 template <typename DatabaseHandler>
@@ -794,15 +845,24 @@
 
 template <>
 void
-PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
+PublishAdapter<ConnectionPool_T>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
 {
-  std::string errMsg;
-  bool success = false;
-  m_mutex.lock();
-  atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
-  m_mutex.unlock();
-  if (!success)
-    _LOG_ERROR(errMsg);
+  Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
+
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
+    return;
+  }
+
+  TRY {
+    Connection_execute(conn, reinterpret_cast<const char*>(sql.c_str()), sql.size());
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  Connection_close(conn);
 }
 
 template<typename DatabaseHandler>
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ee60c34..ce67f4d 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -185,15 +185,6 @@
   sendNack(const ndn::Name& dataPrefix);
 
   /**
-   * Helper function that generates the sqlQuery string for component-based query
-   * @param sqlQuery:     stringstream to save the sqlQuery string
-   * @param jsonValue:    Json value that contains the query information
-   */
-  bool
-  json2Sql(std::stringstream& sqlQuery,
-           Json::Value& jsonValue);
-
-  /**
    * Helper function that signs the data
    */
   void
@@ -203,10 +194,21 @@
    * Helper function that publishes query-results data segments
    */
   virtual void
-  prepareSegments(const ndn::Name& segmentPrefix,
-                  const std::string& sqlString,
-                  bool autocomplete,
-                  bool lastComponent);
+  prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
+                             const std::string& sqlString,
+                             bool lastComponent,
+                             const std::string& nameField);
+
+  virtual void
+  prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
+                          const ndn::Name& segmentPrefix);
+
+  void
+  generateSegments(ResultSet_T& res,
+                   const ndn::Name& segmentPrefix,
+                   int resultCount,
+                   bool autocomplete,
+                   bool lastComponent);
 
   /**
    * Helper function to set the DatabaseHandler
@@ -214,6 +216,9 @@
   void
   setDatabaseHandler(const util::ConnectionDetails&  databaseId);
 
+  void
+  closeDatabaseHandler();
+
   /**
    * Helper function that set filters to make the adapter work
    */
@@ -228,15 +233,21 @@
    * @param sqlQuery:      stringstream to save the sqlQuery string
    * @param jsonValue:     Json value that contains the query information
    * @param lastComponent: Flag to mark the last component query
+   * @param nameField:     stringstream to save the nameField string
    */
   bool
   json2AutocompletionSql(std::stringstream& sqlQuery,
                          Json::Value& jsonValue,
-                         bool& lastComponent);
+                         bool& lastComponent,
+                         std::stringstream& nameField);
 
   bool
-  json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
-                            Json::Value& jsonValue);
+  doPrefixBasedSearch(Json::Value& jsonValue,
+                      std::vector<std::pair<std::string, std::string>>& typedComponents);
+
+  bool
+  doFilterBasedSearch(Json::Value& jsonValue,
+                      std::vector<std::pair<std::string, std::string>>& typedComponents);
 
   ndn::Name
   getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
@@ -248,14 +259,13 @@
 protected:
   typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
   // Handle to the Catalog's database
-  std::shared_ptr<DatabaseHandler> m_databaseHandler;
+  std::shared_ptr<DatabaseHandler> m_dbConnPool;
   const std::shared_ptr<chronosync::Socket>& m_socket;
 
   // mutex to control critical sections
   std::mutex m_mutex;
   // @{ needs m_mutex protection
   // The Queries we are currently writing to
-  //std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
   ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
   ndn::util::InMemoryStorageLru m_cache;
   std::string m_chronosyncDigest;
@@ -396,7 +406,7 @@
 
 template <>
 void
-QueryAdapter<MYSQL>::setCatalogId()
+QueryAdapter<ConnectionPool_T>::setCatalogId()
 {
   // use public key digest as the catalog ID
   ndn::Name keyId;
@@ -421,20 +431,34 @@
 
 template <>
 void
-QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+QueryAdapter<ConnectionPool_T>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
 {
-  std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
-
-  m_databaseHandler = conn;
+  m_dbConnPool = zdbConnectionSetup(databaseId);
 }
 
 template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::closeDatabaseHandler()
+{
+}
+
+template <>
+void
+QueryAdapter<ConnectionPool_T>::closeDatabaseHandler()
+{
+  ConnectionPool_stop(*m_dbConnPool);
+}
+
+
+template <typename DatabaseHandler>
 QueryAdapter<DatabaseHandler>::~QueryAdapter()
 {
   for (const auto& itr : m_registeredPrefixList) {
     if (static_cast<bool>(itr.second))
       m_face->unsetInterestFilter(itr.second);
   }
+
+  closeDatabaseHandler();
 }
 
 template <typename DatabaseHandler>
@@ -446,7 +470,8 @@
 
   // Interest must carry component "initialization" or "query"
   if (interest.getName().size() < filter.getPrefix().size()) {
-    // @todo: return a nack
+    // must NACK incorrect interest
+    sendNack(interest.getName());
     return;
   }
 
@@ -457,7 +482,7 @@
     std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
                             this,
                             interestPtr);
-    queryThread.join();
+    queryThread.detach();
   }
   else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
 
@@ -484,7 +509,7 @@
     std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
                             this,
                             interestPtr);
-    queryThread.join();
+    queryThread.detach();
   }
 
   // ignore other Interests
@@ -597,31 +622,35 @@
 // get distinct value of each column
 template <>
 void
-QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
+QueryAdapter<ConnectionPool_T>::getFiltersMenu(Json::Value& value)
 {
   _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
   Json::Value tmp;
 
+  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
+    return;
+  }
+
   for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
     std::string columnName = m_filterCategoryNames[i];
     std::string getFilterSql("SELECT DISTINCT " + columnName +
                              " FROM " + m_databaseTable + ";");
-    std::string errMsg;
-    bool success;
 
-    std::shared_ptr<MYSQL_RES> results
-      = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
-                                       util::QUERY, success, errMsg);
-    if (!success) {
-      _LOG_ERROR(errMsg);
-      value.clear();
-      return;
+    ResultSet_T res4ColumnName;
+    TRY {
+      res4ColumnName = Connection_executeQuery(conn, reinterpret_cast<const char*>(getFilterSql.c_str()), getFilterSql.size());
+    }
+    CATCH(SQLException) {
+      _LOG_ERROR(Connection_getLastError(conn));
+    }
+    END_TRY;
+
+    while (ResultSet_next(res4ColumnName)) {
+      tmp[columnName].append(ResultSet_getString(res4ColumnName, 1));
     }
 
-    while (MYSQL_ROW row = mysql_fetch_row(results.get()))
-    {
-      tmp[columnName].append(row[0]);
-    }
     value.append(tmp);
     tmp.clear();
   }
@@ -689,67 +718,16 @@
 
   m_mutex.lock();
   m_cache.insert(*nack);
+  m_face->put(*nack);
   m_mutex.unlock();
 }
 
-
-template <typename DatabaseHandler>
-bool
-QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
-                                        Json::Value& jsonValue)
-{
-  _LOG_DEBUG(">> QueryAdapter::json2Sql");
-
-  _LOG_DEBUG(jsonValue.toStyledString());
-
-  if (jsonValue.type() != Json::objectValue) {
-    return false;
-  }
-
-  sqlQuery << "SELECT name FROM " << m_databaseTable;
-  bool input = false;
-  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
-  {
-    Json::Value key = iter.key();
-    Json::Value value = (*iter);
-
-    if (key == Json::nullValue || value == Json::nullValue) {
-      _LOG_ERROR("Null key or value in JsonValue");
-      return false;
-    }
-
-    // cannot convert to string
-    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
-      _LOG_ERROR("Malformed JsonQuery string");
-      return false;
-    }
-
-    if (key.asString().compare("?") == 0) {
-      continue;
-    }
-
-    if (input) {
-      sqlQuery << " AND";
-    } else {
-      sqlQuery << " WHERE";
-    }
-
-    sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
-    input = true;
-  }
-
-  if (!input) { // Force it to be the empty set
-     return false;
-  }
-  sqlQuery << ";";
-  return true;
-}
-
 template <typename DatabaseHandler>
 bool
 QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
                                                       Json::Value& jsonValue,
-                                                      bool& lastComponent)
+                                                      bool& lastComponent,
+                                                      std::stringstream& fieldName)
 {
   _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");
 
@@ -812,7 +790,8 @@
     lastComponent = true; // indicate this query is to query the last component
 
   bool more = false;
-  sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
+
+  fieldName << m_nameFields[count];
   for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
        it != typedComponents.end(); ++it) {
     if (more)
@@ -828,14 +807,12 @@
   return true;
 }
 
-template <typename DatabaseHandler>
+template <typename databasehandler>
 bool
-QueryAdapter<DatabaseHandler>::json2PrefixBasedSearchSql(std::stringstream& sqlQuery,
-                                                         Json::Value& jsonValue)
+QueryAdapter<databasehandler>::doPrefixBasedSearch(Json::Value& jsonValue,
+                                                   std::vector<std::pair<std::string, std::string>>& typedComponents)
 {
-  _LOG_DEBUG(">> QueryAdapter::json2CompleteSearchSql");
-
-  _LOG_DEBUG(jsonValue.toStyledString());
+  _LOG_DEBUG(">> QueryAdapter::doPrefixBasedSearch");
 
   if (jsonValue.type() != Json::objectValue) {
     return false;
@@ -849,13 +826,13 @@
     Json::Value value = (*iter);
 
     if (key == Json::nullValue || value == Json::nullValue) {
-      _LOG_ERROR("Null key or value in JsonValue");
+      _LOG_ERROR("null key or value in jsonValue");
       return false;
     }
 
     // cannot convert to string
     if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
-      _LOG_ERROR("Malformed JsonQuery string");
+      _LOG_ERROR("malformed jsonquery string");
       return false;
     }
 
@@ -871,10 +848,10 @@
   size_t pos = 0;
   size_t start = 1; // start from the 1st char which is not '/'
   size_t count = 0; // also the name to query for
-  size_t typedStringLen = typedString.length();
+  size_t typedStringlen = typedString.length();
   std::string token;
   std::string delimiter = "/";
-  std::vector<std::pair<std::string, std::string>> typedComponents;
+
   while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
     token = typedString.substr(start, pos - start);
     if (count >= m_nameFields.size()) {
@@ -889,30 +866,54 @@
   }
 
   // we may have a component after the last "/"
-  if (start < typedStringLen) {
+  if (start < typedStringlen) {
     typedComponents.push_back(std::make_pair(m_nameFields[count],
-                                             typedString.substr(start, typedStringLen - start)));
+                                             typedString.substr(start, typedStringlen - start)));
   }
 
-  // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
-  // return true
-  bool more = false;
-  sqlQuery << "SELECT name FROM " << m_databaseTable;
-  for (std::vector<std::pair<std::string, std::string>>::iterator it = typedComponents.begin();
-       it != typedComponents.end(); ++it) {
-    if (more)
-      sqlQuery << " AND";
-    else
-      sqlQuery << " WHERE";
-
-    sqlQuery << " " << it->first << "='" << it->second << "'";
-
-    more = true;
-  }
-  sqlQuery << ";";
   return true;
 }
 
+
+template <typename databasehandler>
+bool
+QueryAdapter<databasehandler>::doFilterBasedSearch(Json::Value& jsonValue,
+                                                   std::vector<std::pair<std::string, std::string>>& typedComponents)
+{
+  _LOG_DEBUG(">> QueryAdapter::doFilterBasedSearch");
+
+  if (jsonValue.type() != Json::objectValue) {
+    return false;
+  }
+
+  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+  {
+    Json::Value key = iter.key();
+    Json::Value value = (*iter);
+
+    if (key == Json::nullValue || value == Json::nullValue) {
+      _LOG_ERROR("null key or value in jsonValue");
+      return false;
+    }
+
+    // cannot convert to string
+    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
+      _LOG_ERROR("malformed jsonQuery string");
+      return false;
+    }
+
+    if (key.asString().compare("?") == 0 || key.asString().compare("??") == 0) {
+      continue;
+    }
+
+    _LOG_DEBUG(key.asString() << " " << value.asString());
+    typedComponents.push_back(std::make_pair(key.asString(), value.asString()));
+  }
+
+  return true;
+}
+
+
 template <typename DatabaseHandler>
 void
 QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
@@ -927,7 +928,8 @@
   const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
 
   if (jsonQuery.length() <= 0) {
-    // no JSON query, send Nack?
+    // no JSON query, send Nack
+    sendNack(interest->getName());
     return;
   }
 
@@ -959,119 +961,274 @@
   Json::Value parsedFromString;
   Json::Reader reader;
   if (!reader.parse(jsonQuery, parsedFromString)) {
-    // @todo: send NACK?
+    // json object is broken
+    sendNack(interest->getName());
     _LOG_ERROR("Cannot parse the JsonQuery");
     return;
   }
 
   // 3) Convert the JSON Query into a MySQL one
-  bool autocomplete = false, lastComponent = false;
-  std::stringstream sqlQuery;
-
   ndn::Name segmentPrefix(getQueryResultsName(interest, version));
   _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
 
   Json::Value tmp;
+  std::vector<std::pair<std::string, std::string>> typedComponents;
+
   // expect the autocomplete and the component-based query are separate
-  // if JSON::Value contains ? as key, is autocompletion
+  // if Json::Value contains ? as key, is autocompletion
   if (parsedFromString.get("?", tmp) != tmp) {
-    autocomplete = true;
-    if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent)) {
+    bool lastComponent = false;
+    std::stringstream sqlQuery, fieldName;
+
+    // must generate the sql string for autocomple, the selected column is changing
+    if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent, fieldName)) {
       sendNack(segmentPrefix);
       return;
     }
+    prepareSegmentsBySqlString(segmentPrefix, sqlQuery.str(), lastComponent, fieldName.str());
   }
   else if (parsedFromString.get("??", tmp) != tmp) {
-    if (!json2PrefixBasedSearchSql(sqlQuery, parsedFromString)) {
+    if (!doPrefixBasedSearch(parsedFromString, typedComponents)) {
       sendNack(segmentPrefix);
       return;
     }
+    prepareSegmentsByParams(typedComponents, segmentPrefix);
   }
   else {
-    if (!json2Sql(sqlQuery, parsedFromString)) {
+    if (!doFilterBasedSearch(parsedFromString, typedComponents)) {
       sendNack(segmentPrefix);
       return;
     }
+    prepareSegmentsByParams(typedComponents, segmentPrefix);
+  }
+
+}
+
+template <typename databasehandler>
+void
+QueryAdapter<databasehandler>::
+prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
+                        const ndn::Name& segmentprefix)
+{
+}
+
+template <>
+void
+QueryAdapter<ConnectionPool_T>::
+prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
+                        const ndn::Name& segmentPrefix)
+{
+  _LOG_DEBUG(">> QueryAdapter::prepareSegmentsByParams");
+
+  // the prepared_statement cannot improve the performance, but can simplify the code
+  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
+  if (!conn) {
+    // do not answer for this request due to lack of connections, request will come back later
+    _LOG_DEBUG("No available database connections");
+    return;
+  }
+  std::string getRecordNumSqlStr("SELECT count(name) FROM ");
+  getRecordNumSqlStr += m_databaseTable;
+  getRecordNumSqlStr += " WHERE ";
+  for (size_t i = 0; i < m_nameFields.size(); i++) {
+    getRecordNumSqlStr += m_nameFields[i];
+    getRecordNumSqlStr += " LIKE ?";
+    if (i != m_nameFields.size() - 1) {
+      getRecordNumSqlStr += " AND ";
+    }
   }
 
-  // 4) Run the Query
-  prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete, lastComponent);
+  PreparedStatement_T ps4RecordNum =
+    Connection_prepareStatement(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());
+
+  // before query, initialize all params for statement
+  for (size_t i = 0; i < m_nameFields.size(); i++) {
+    PreparedStatement_setString(ps4RecordNum, i + 1, "%");
+  }
+
+  // reset params based on the query
+  for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
+       it != queryParams.end(); ++it) {
+    // dictionary is faster
+    for (size_t i = 0; i < m_nameFields.size(); i++) {
+      if (it->first == m_nameFields[i]) {
+        PreparedStatement_setString(ps4RecordNum, i + 1, it->second.c_str());
+      }
+    }
+  }
+
+  ResultSet_T res4RecordNum;
+  TRY {
+    res4RecordNum = PreparedStatement_executeQuery(ps4RecordNum);
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  uint64_t resultCount = 0; // use count sql to get
+
+  // result for record number
+  while (ResultSet_next(res4RecordNum)) {
+    resultCount = ResultSet_getInt(res4RecordNum, 1);
+  }
+
+  // get name list statement
+  std::string getNameListSqlStr("SELECT name FROM ");
+  getNameListSqlStr += m_databaseTable;
+  getNameListSqlStr += " WHERE ";
+  for (size_t i = 0; i < m_nameFields.size(); i++) {
+    getNameListSqlStr += m_nameFields[i];
+    getNameListSqlStr += " LIKE ?";
+    if (i != m_nameFields.size() - 1) {
+      getNameListSqlStr += " AND ";
+    }
+  }
+
+  PreparedStatement_T ps4Name =
+    Connection_prepareStatement(conn, reinterpret_cast<const char*>(getNameListSqlStr.c_str()), getNameListSqlStr.size());
+
+  // before query, initialize all params for statement
+  for (size_t i = 0; i < m_nameFields.size(); i++) {
+    PreparedStatement_setString(ps4Name, i + 1, "%");
+  }
+
+  // reset params based on the query
+  for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
+       it != queryParams.end(); ++it) {
+    // dictionary is faster
+    for (size_t i = 0; i < m_nameFields.size(); i++) {
+      if (it->first == m_nameFields[i]) {
+        PreparedStatement_setString(ps4Name, i + 1, it->second.c_str());
+      }
+    }
+  }
+
+  ResultSet_T res4Name;
+  TRY {
+    res4Name = PreparedStatement_executeQuery(ps4Name);
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  generateSegments(res4Name, segmentPrefix, resultCount, false, false);
+
+  Connection_close(conn);
 }
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
-                                               const std::string& sqlString,
-                                               bool autocomplete,
-                                               bool lastComponent)
+QueryAdapter<DatabaseHandler>::generateSegments(ResultSet_T& res,
+                                                const ndn::Name& segmentPrefix,
+                                                int resultCount,
+                                                bool autocomplete,
+                                                bool lastComponent)
+{
+  uint64_t segmentno = 0;
+  Json::Value tmp;
+  Json::Value resultjson;
+  Json::FastWriter fastWriter;
+
+  uint64_t viewstart = 0, viewend = 0;
+  while (ResultSet_next(res)) {
+    const char *name = ResultSet_getString(res, 1);
+    tmp.append(name);
+    const std::string tmpString = fastWriter.write(tmp);
+    if (tmpString.length() > PAYLOAD_LIMIT) {
+      std::shared_ptr<ndn::Data> data
+        = makeReplyData(segmentPrefix, resultjson, segmentno, false,
+                        autocomplete, resultCount, viewstart, viewend, lastComponent);
+      m_mutex.lock();
+      m_cache.insert(*data);
+      m_face->put(*data);
+      m_mutex.unlock();
+      tmp.clear();
+      resultjson.clear();
+      segmentno++;
+      viewstart = viewend + 1;
+    }
+    resultjson.append(name);
+    viewend++;
+  }
+  std::shared_ptr<ndn::Data> data
+    = makeReplyData(segmentPrefix, resultjson, segmentno, true,
+                    autocomplete, resultCount, viewstart, viewend, lastComponent);
+  m_mutex.lock();
+  m_cache.insert(*data);
+  m_face->put(*data);
+  m_mutex.unlock();
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
+                                                          const std::string& sqlString,
+                                                          bool lastComponent,
+                                                          const std::string& nameField)
 {
   // empty
 }
 
-// prepareSegments specilization function
-template<>
+
+template <>
 void
-QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
-                                     const std::string& sqlString,
-                                     bool autocomplete,
-                                     bool lastComponent)
+QueryAdapter<ConnectionPool_T>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
+                                                          const std::string& sqlString,
+                                                          bool lastComponent,
+                                                          const std::string& nameField)
 {
-  _LOG_DEBUG(">> QueryAdapter::prepareSegments");
+  _LOG_DEBUG(">> QueryAdapter::prepareSegmentsBySqlString");
 
   _LOG_DEBUG(sqlString);
 
-  std::string errMsg;
-  bool success;
-  // 4) Run the Query
-  std::shared_ptr<MYSQL_RES> results
-    = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
-  if (!success)
-    _LOG_ERROR(errMsg);
-
-  if (!results) {
-    _LOG_ERROR("NULL MYSQL_RES for" << sqlString);
-
-    // @todo: throw runtime error or log the error message?
+  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
+  if (!conn) {
+    _LOG_DEBUG("No available database connections");
     return;
   }
 
-  uint64_t resultCount = mysql_num_rows(results.get());
+  //// just for get the rwo count ...
+  std::string getRecordNumSqlStr("SELECT COUNT( DISTINCT ");
+  getRecordNumSqlStr += nameField;
+  getRecordNumSqlStr += ") FROM ";
+  getRecordNumSqlStr += m_databaseTable;
+  getRecordNumSqlStr += sqlString;
 
-  _LOG_DEBUG("Query resuls contain " << resultCount << "rows");
-
-  MYSQL_ROW row;
-  uint64_t segmentNo = 0;
-  Json::Value tmp;
-  Json::Value resultJson;
-  Json::FastWriter fastWriter;
-
-  uint64_t viewStart = 0, viewEnd = 0;
-  while ((row = mysql_fetch_row(results.get())))
-  {
-    tmp.append(row[0]);
-    const std::string tmpString = fastWriter.write(tmp);
-    if (tmpString.length() > PAYLOAD_LIMIT) {
-      std::shared_ptr<ndn::Data> data
-        = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
-                        autocomplete, resultCount, viewStart, viewEnd, lastComponent);
-      m_mutex.lock();
-      m_cache.insert(*data);
-      m_mutex.unlock();
-      tmp.clear();
-      resultJson.clear();
-      segmentNo++;
-      viewStart = viewEnd + 1;
-    }
-    resultJson.append(row[0]);
-    viewEnd++;
+  ResultSet_T res4RecordNum;
+  TRY {
+    res4RecordNum = Connection_executeQuery(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());
   }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
 
-  std::shared_ptr<ndn::Data> data
-    = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
-                    autocomplete, resultCount, viewStart, viewEnd, lastComponent);
-  m_mutex.lock();
-  m_cache.insert(*data);
-  m_mutex.unlock();
+  uint64_t resultCount = 0;
+  while (ResultSet_next(res4RecordNum)) {
+    resultCount = ResultSet_getInt(res4RecordNum, 1);
+  }
+  ////
+
+  std::string getNextFieldsSqlStr("SELECT DISTINCT ");
+  getNextFieldsSqlStr += nameField;
+  getNextFieldsSqlStr += " FROM ";
+  getNextFieldsSqlStr += m_databaseTable;
+  getNextFieldsSqlStr += sqlString;
+
+  ResultSet_T res4NextFields;
+  TRY {
+    res4NextFields = Connection_executeQuery(conn, reinterpret_cast<const char*>(getNextFieldsSqlStr.c_str()), getNextFieldsSqlStr.size());
+  }
+  CATCH(SQLException) {
+    _LOG_ERROR(Connection_getLastError(conn));
+  }
+  END_TRY;
+
+  generateSegments(res4NextFields, segmentPrefix, resultCount, true, lastComponent);
+
+  Connection_close(conn);
 }
 
 template <typename DatabaseHandler>
@@ -1096,8 +1253,8 @@
   if (lastComponent)
     entry["lastComponent"] = Json::Value(true);
 
-  _LOG_DEBUG("resultCount " << resultCount << ";"
-             << "viewStart " << viewStart << ";"
+  _LOG_DEBUG("resultCount " << resultCount << "; "
+             << "viewStart " << viewStart << "; "
              << "viewEnd " << viewEnd);
 
   if (isAutocomplete) {
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
index 9df8b8e..38c0f3a 100644
--- a/catalog/src/util/mysql-util.cpp
+++ b/catalog/src/util/mysql-util.cpp
@@ -31,55 +31,26 @@
   // empty
 }
 
-
-std::shared_ptr<MYSQL>
-MySQLConnectionSetup(const ConnectionDetails& details)
+std::shared_ptr<ConnectionPool_T>
+zdbConnectionSetup(const ConnectionDetails& details)
 {
-  MYSQL* conn = mysql_init(NULL);
-  my_bool reconnect = 1;
-  mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect);
-  if(!mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
-                         details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
-    throw std::runtime_error(mysql_error(conn));
-  }
-  std::shared_ptr<MYSQL> connection(conn, &mysql_close);
-  return connection;
-}
+  std::string dbConnStr("mysql://");
+  dbConnStr += details.user;
+  dbConnStr += ":";
+  dbConnStr += details.password;
+  dbConnStr += "@";
+  dbConnStr += details.server;
+  dbConnStr += ":3306/";
+  dbConnStr += details.database;
 
-std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
-                  const std::string& sql_query,
-                  DatabaseOperation op,
-                  bool& success,
-                  std::string& errMsg)
-{
-  switch (mysql_query(connection.get(), sql_query.c_str()))
-  {
-  case 0:
-  {
-    success = true;
-    if (op == QUERY) {
-      MYSQL_RES* resultPtr = mysql_store_result(connection.get());
-      if (resultPtr != NULL)
-      {
-        return std::shared_ptr<MYSQL_RES>(resultPtr, &mysql_free_result);
-      }
-    }
-    //for add, remove, we don't need the results, may be log the events
-    break;
-  }
-  // Various error cases
-  case CR_COMMANDS_OUT_OF_SYNC:
-  case CR_SERVER_GONE_ERROR:
-  case CR_SERVER_LOST:
-  case CR_UNKNOWN_ERROR:
-  default:
-  {
-    errMsg.assign(mysql_error(connection.get()));
-    break;
-  }
-  }
-  return nullptr;
+  URL_T url = URL_new(dbConnStr.c_str());
+
+  ConnectionPool_T dbConnPool = ConnectionPool_new(url);
+  ConnectionPool_setMaxConnections(dbConnPool, MAX_DB_CONNECTIONS);
+  ConnectionPool_setReaper(dbConnPool, 1);
+  ConnectionPool_start(dbConnPool);
+  auto sharedPool = std::make_shared<ConnectionPool_T>(dbConnPool);
+  return sharedPool;
 }
 
 } // namespace util
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
index 75e5997..13078a9 100644
--- a/catalog/src/util/mysql-util.hpp
+++ b/catalog/src/util/mysql-util.hpp
@@ -20,12 +20,15 @@
 #define ATMOS_UTIL_CONNECTION_DETAILS_HPP
 
 #include "mysql/mysql.h"
-
 #include <memory>
 #include <string>
+#include <zdb/zdb.h>
 
 namespace atmos {
 namespace util {
+
+#define MAX_DB_CONNECTIONS 100
+
 enum DatabaseOperation {CREATE, UPDATE, ADD, REMOVE, QUERY};
 struct ConnectionDetails {
 public:
@@ -38,15 +41,8 @@
                     const std::string& passwordInput, const std::string& databaseInput);
 };
 
-std::shared_ptr<MYSQL>
-MySQLConnectionSetup(const ConnectionDetails& details);
-
-std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
-                  const std::string& sql_query,
-                  DatabaseOperation op,
-                  bool& success,
-                  std::string& errMsg);
+std::shared_ptr<ConnectionPool_T>
+zdbConnectionSetup(const ConnectionDetails& details);
 
 } // namespace util
 } // namespace atmos
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index 88c8ea2..b5c0161 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -84,13 +84,6 @@
       return makeAckData(interest, version);
     }
 
-    bool
-    json2SqlTest(std::stringstream& ss,
-                 Json::Value& parsedFromString)
-    {
-      return json2Sql(ss, parsedFromString);
-    }
-
     std::shared_ptr<ndn::Data>
     getReplyData(const ndn::Name& segmentPrefix,
                  const Json::Value& value,
@@ -112,12 +105,14 @@
     }
 
     void
-    prepareSegments(const ndn::Name& segmentPrefix,
-                    const std::string& sqlString,
-                    bool autocomplete,
-                    bool lastComponent)
+    prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
+                            const ndn::Name& segmentPrefix)
     {
-      BOOST_CHECK_EQUAL(sqlString, "SELECT name FROM cmip5 WHERE name=\'test\';");
+      //BOOST_CHECK_EQUAL(sqlString, "SELECT name FROM cmip5 WHERE name=\'test\';");
+      for (auto it = queryParams.begin() ; it != queryParams.end(); ++it) {
+        std::cout << it->first << " " << it->second << std::endl;
+      }
+
       Json::Value fileList;
       fileList.append("/ndn/test1");
       fileList.append("/ndn/test2");
@@ -125,7 +120,7 @@
 
       std::shared_ptr<ndn::Data> data = makeReplyData(segmentPrefix,
                                                       fileList, 0, true, false,
-                                                      3, 0, 2, lastComponent);
+                                                      3, 0, 2, true);
       m_mutex.lock();
       m_cache.insert(*data);
       m_mutex.unlock();
@@ -147,17 +142,27 @@
     bool
     json2AutocompletionSqlTest(std::stringstream& sqlQuery,
                                Json::Value& jsonValue,
-                               bool& lastComponent)
+                               bool& lastComponent,
+                               std::stringstream& nameField)
     {
-      return json2AutocompletionSql(sqlQuery, jsonValue, lastComponent);
+      return json2AutocompletionSql(sqlQuery, jsonValue, lastComponent, nameField);
     }
 
     bool
-    json2PrefixBasedSearchSqlTest(std::stringstream& sqlQuery,
-                                  Json::Value& jsonValue)
+    testDoPrefixBasedSearch(Json::Value& jsonValue,
+                            std::vector<std::pair<std::string, std::string>>& typedComponents)
     {
-      return json2PrefixBasedSearchSql(sqlQuery, jsonValue);
+      return doPrefixBasedSearch(jsonValue, typedComponents);
     }
+
+    bool
+    testDoFilterBasedSearch(Json::Value& jsonValue,
+                            std::vector<std::pair<std::string, std::string>>& typedComponents)
+    {
+      return doFilterBasedSearch(jsonValue, typedComponents);
+    }
+
+
   };
 
   class QueryAdapterFixture : public UnitTestTimeFixture
@@ -269,121 +274,6 @@
     BOOST_CHECK(queryAdapterTest1.getSigningId() == ndn::Name("/test/signingId"));
   }
 
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseNormalTest)
-  {
-    Json::Value testJson;
-    testJson["name"] = "test";
-    testJson["activity"] = "testActivity";
-    testJson["product"] = "testProduct";
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(true, queryAdapterTest1.json2SqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL(ss.str(), "SELECT name FROM cmip5 WHERE\
- activity=\'testActivity\' AND name='test\' AND product=\'testProduct\';");
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseEmptyTest)
-  {
-    Json::Value testJson;
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseAllItemsTest)
-  {
-    Json::Value testJson;
-    testJson["name"] = "test";
-    testJson["activity"] = "testActivity";
-    testJson["product"] = "testProduct";
-    testJson["origanization"] = "testOrg";
-    testJson["model"] = "testModel";
-    testJson["experiment"] = "testExperiment";
-    testJson["frequency"] = "testFrenquency";
-    testJson["modeling realm"] = "testModeling";
-    testJson["variable name"] = "testVarName";
-    testJson["ensemble member"] = "testEnsembleMember";
-    testJson["ensemble"] = "testEnsemble";
-    testJson["sample granularity"] = "testSampleGranularity";
-    testJson["start time"] = "testStartTime";
-    testJson["field campaign"] = "testFieldCampaign";
-    testJson["optical properties for radiation"] = "testOptProperties";
-    testJson["grid resolution"] = "testGridResolution";
-    testJson["output type"] = "testOutputType";
-    testJson["timestamp"] = "testTimestamp";
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(true, queryAdapterTest1.json2SqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL(ss.str(), "SELECT name FROM cmip5 WHERE activity=\'testActivity\' AND \
-ensemble=\'testEnsemble\' AND ensemble member=\'testEnsembleMember\' AND \
-experiment=\'testExperiment\' AND field campaign=\'testFieldCampaign\' AND \
-frequency=\'testFrenquency\' AND grid resolution=\'testGridResolution\' AND \
-model=\'testModel\' AND modeling realm=\'testModeling\' AND name=\'test\' AND \
-optical properties for radiation=\'testOptProperties\' AND origanization=\'testOrg\' AND \
-output type=\'testOutputType\' AND product=\'testProduct\' AND sample \
-granularity=\'testSampleGranularity\' AND start time=\'testStartTime\' AND \
-timestamp=\'testTimestamp\' AND variable name=\'testVarName\';");
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseSearchTest)
-  {
-    // incorrect autocompletion is ok for sql conversion
-    Json::Value testJson;
-    testJson["name"] = "test";
-    testJson["?"] = "serchTest";
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(true, queryAdapterTest1.json2SqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL(ss.str(),
-      "SELECT name FROM cmip5 WHERE name=\'test\';");
-  }
-
-   BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseFailTest1)
-   {
-     Json::Value testJson;
-    testJson["name"] = Json::nullValue;
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-   }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseFailTest2)
-  {
-    Json::Value testJson;
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseFailTest3)
-  {
-    Json::Value testJson;
-    testJson = Json::Value(Json::arrayValue);
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseFailTest4)
-  {
-    Json::Value testJson;
-    testJson[0] = "test";
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-  }
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterJsonParseFailTest5)
-  {
-    Json::Value testJson;
-    Json::Value param;
-    param[0] = "test";
-    testJson["name"] = param;
-
-    std::stringstream ss;
-    BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
-  }
-
   // use real data instead of ack data
   BOOST_AUTO_TEST_CASE(QueryAdapterMakeAckDataTest)
   {
@@ -491,155 +381,231 @@
   {
     initializeQueryAdapterTest2();
 
-    std::stringstream ss;
+    std::stringstream ss, nameField;
     Json::Value testJson;
     bool lastComponent = false;
     testJson["?"] = "/";
     BOOST_CHECK_EQUAL(true,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
     BOOST_CHECK_EQUAL(lastComponent, false);
-    BOOST_CHECK_EQUAL("SELECT DISTINCT activity FROM cmip5;", ss.str());
+    BOOST_CHECK_EQUAL(";", ss.str());
+    BOOST_CHECK_EQUAL("activity", nameField.str());
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     testJson.clear();
     testJson["?"] = "/Activity/";
     BOOST_CHECK_EQUAL(true,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
     BOOST_CHECK_EQUAL(lastComponent, false);
-    BOOST_CHECK_EQUAL("SELECT DISTINCT product FROM cmip5 WHERE activity='Activity';", ss.str());
+    BOOST_CHECK_EQUAL(" WHERE activity='Activity';", ss.str());
+    BOOST_CHECK_EQUAL("product", nameField.str());
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     testJson.clear();
     testJson["?"] = "/Activity/Product/Organization/Model/Experiment/";
     BOOST_CHECK_EQUAL(true,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
     BOOST_CHECK_EQUAL(lastComponent, false);
-    BOOST_CHECK_EQUAL("SELECT DISTINCT frequency FROM cmip5 WHERE activity='Activity' AND \
+    BOOST_CHECK_EQUAL(" WHERE activity='Activity' AND \
 experiment='Experiment' AND model='Model' AND organization='Organization' AND product='Product';",
      ss.str());
+    BOOST_CHECK_EQUAL("frequency", nameField.str());
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     testJson.clear();
     testJson["?"] = "/Activity/Product/Organization/Model/Experiment/Frequency/Modeling/\
 Variable/Ensemble/";
     BOOST_CHECK_EQUAL(true,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
     BOOST_CHECK_EQUAL(lastComponent, true);
-    BOOST_CHECK_EQUAL("SELECT DISTINCT time FROM cmip5 WHERE activity='Activity' AND ensemble=\
+    BOOST_CHECK_EQUAL(" WHERE activity='Activity' AND ensemble=\
 'Ensemble' AND experiment='Experiment' AND frequency='Frequency' AND model='Model' AND \
 modeling_realm='Modeling' AND organization='Organization' AND product='Product' AND variable_name=\
 'Variable';",ss.str());
+    BOOST_CHECK_EQUAL("time", nameField.str());
   }
 
   BOOST_AUTO_TEST_CASE(QueryAdapterAutocompletionSqlFailTest)
   {
     initializeQueryAdapterTest2();
 
-    std::stringstream ss;
+    std::stringstream ss, nameField;
     Json::Value testJson;
     bool lastComponent = false;
     testJson["?"] = "serchTest";
     BOOST_CHECK_EQUAL(false,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     testJson.clear();
     testJson["?"] = "/cmip5";
     BOOST_CHECK_EQUAL(false,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     Json::Value testJson2; //simply clear does not work
     testJson2[0] = "test";
     BOOST_CHECK_EQUAL(false,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson2, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     Json::Value testJson3;
     testJson3 = Json::Value(Json::arrayValue);
     BOOST_CHECK_EQUAL(false,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson3, lastComponent));
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
 
     ss.str("");
     ss.clear();
+    nameField.str("");
+    nameField.clear();
     Json::Value testJson4;
     Json::Value param;
     param[0] = "test";
     testJson4["name"] = param;
     BOOST_CHECK_EQUAL(false,
-                      queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson4, lastComponent));
-}
-
-  BOOST_AUTO_TEST_CASE(QueryAdapterPrefixBasedSearchSuccessTest)
-  {
-    initializeQueryAdapterTest2();
-
-    std::stringstream ss;
-    Json::Value testJson;
-    testJson["??"] = "/";
-    BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT name FROM cmip5;", ss.str());
-
-    ss.str("");
-    ss.clear();
-    testJson.clear();
-    testJson["??"] = "/Activity/Product";
-    BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson));
-    BOOST_CHECK_EQUAL("SELECT name FROM cmip5 WHERE activity='Activity' AND product='Product';",
-                      ss.str());
-
-    ss.str("");
-    ss.clear();
-    testJson.clear();
-    testJson["??"] = "/Activity/Product/Organization/Model/Experiment/Frequency/Modeling/\
-Variable/Ensemble/Time/";
-
-    BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson));
-
-    BOOST_CHECK_EQUAL("SELECT name FROM cmip5 WHERE activity='Activity' AND product='Product' \
-AND organization='Organization' AND model='Model' AND experiment='Experiment' AND frequency=\
-'Frequency' AND modeling_realm='Modeling' AND variable_name='Variable' AND ensemble='Ensemble'\
- AND time='Time';", ss.str());
+                      queryAdapterTest2.json2AutocompletionSqlTest(ss,
+                                                                   testJson,
+                                                                   lastComponent,
+                                                                   nameField));
   }
 
-  BOOST_AUTO_TEST_CASE(QueryAdapterPrefixBasedSearchFailureTest)
+  BOOST_AUTO_TEST_CASE(QueryAdapterDoFilterBasedSearchTest)
   {
-    initializeQueryAdapterTest2();
-
-    std::stringstream ss;
     Json::Value testJson;
+    testJson["activity"] = "testActivity";
+    testJson["product"] = "testProduct";
 
-    ss.str("");
-    ss.clear();
+    std::vector<std::pair<std::string, std::string>> resultComponents;
+
+    BOOST_CHECK_EQUAL(true, queryAdapterTest1.testDoFilterBasedSearch(testJson, resultComponents));
+    BOOST_CHECK_EQUAL(2, resultComponents.size());
+    for (auto it=resultComponents.begin(); it != resultComponents.end(); it++) {
+      if (it->first == "activity")
+        BOOST_CHECK_EQUAL(it->second, "testActivity");
+      if (it->first == "product")
+        BOOST_CHECK_EQUAL(it->second, "testProduct");
+    }
+
     testJson.clear();
-    testJson["??"] = "";
-    BOOST_CHECK_EQUAL(false, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson));
+    resultComponents.clear();
 
-    ss.str("");
-    ss.clear();
-    Json::Value testJson2; //simply clear does not work
-    testJson2[0] = "test"; // incorrect json object
-    BOOST_CHECK_EQUAL(false, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson2));
+    BOOST_CHECK_EQUAL(true, queryAdapterTest1.testDoFilterBasedSearch(testJson, resultComponents));
+    BOOST_CHECK_EQUAL(0, resultComponents.size());
 
-    ss.str("");
-    ss.clear();
-    Json::Value testJson3;
-    testJson3 = Json::Value(Json::arrayValue);  // incorrect json object
-    BOOST_CHECK_EQUAL(false, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson3));
+    testJson.clear();
+    resultComponents.clear();
 
-    ss.str("");
-    ss.clear();
-    Json::Value testJson4;
+    testJson["name"] = Json::nullValue;
+    BOOST_CHECK_EQUAL(false, queryAdapterTest1.testDoFilterBasedSearch(testJson, resultComponents));
+
+    testJson.clear();
+    resultComponents.clear();
     Json::Value param;
     param[0] = "test";
-    testJson4["name"] = param;  // incorrect json object
-    BOOST_CHECK_EQUAL(false, queryAdapterTest2.json2PrefixBasedSearchSqlTest(ss, testJson4));
+    testJson["name"] = param;
+
+    BOOST_CHECK_EQUAL(false, queryAdapterTest1.testDoFilterBasedSearch(testJson, resultComponents));
+
+    testJson.clear();
+    resultComponents.clear();
+    Json::Value tmp;
+    tmp[0] = "test";
+
+    BOOST_CHECK_EQUAL(false, queryAdapterTest1.testDoFilterBasedSearch(tmp, resultComponents));
+  }
+
+  BOOST_AUTO_TEST_CASE(QueryAdapterDoPrefixBasedSearchTest)
+  {
+    Json::Value testJson;
+    testJson["??"] = "/";
+
+    std::vector<std::pair<std::string, std::string>> resultComponents;
+
+    BOOST_CHECK_EQUAL(true, queryAdapterTest2.testDoPrefixBasedSearch(testJson, resultComponents));
+    BOOST_CHECK_EQUAL(0, resultComponents.size());
+
+    testJson.clear();
+    resultComponents.clear();
+
+    testJson["??"] = "/Activity/Product";
+    BOOST_CHECK_EQUAL(true, queryAdapterTest2.testDoPrefixBasedSearch(testJson, resultComponents));
+    BOOST_CHECK_EQUAL(2, resultComponents.size());
+
+    for (auto it=resultComponents.begin(); it != resultComponents.end(); it++) {
+      if (it->first == "activity")
+        BOOST_CHECK_EQUAL(it->second, "Activity");
+      if (it->first == "product")
+        BOOST_CHECK_EQUAL(it->second, "Product");
+    }
+
+    testJson.clear();
+    resultComponents.clear();
+
+    testJson["??"] = "";
+    BOOST_CHECK_EQUAL(false, queryAdapterTest2.testDoPrefixBasedSearch(testJson, resultComponents));
+
+    testJson.clear();
+    resultComponents.clear();
+
+    testJson = Json::Value(Json::arrayValue);
+    BOOST_CHECK_EQUAL(false, queryAdapterTest2.testDoPrefixBasedSearch(testJson, resultComponents));
+
+    testJson.clear();
+    resultComponents.clear();
+
+    Json::Value tmp;
+    tmp[0] = "test";
+    BOOST_CHECK_EQUAL(false, queryAdapterTest2.testDoPrefixBasedSearch(tmp, resultComponents));
+
+    testJson.clear();
+    resultComponents.clear();
+
+    Json::Value param, testJson2;
+    param[0] = "test";
+    testJson2["activity"] = param;
+    BOOST_CHECK_EQUAL(false, queryAdapterTest2.testDoPrefixBasedSearch(testJson2, resultComponents));
   }
 
   BOOST_AUTO_TEST_SUITE_END()
diff --git a/wscript b/wscript
index 6b3c33a..0682baa 100644
--- a/wscript
+++ b/wscript
@@ -59,6 +59,9 @@
     conf.check_cfg(package='libndn-cxx', args=['--cflags', '--libs'],
                    uselib_store='NDN_CXX', mandatory=True)
 
+    conf.check_cfg(package='zdb', args=['--cflags', '--libs'],
+                   uselib_store='ZDB', mandatory=True)
+
     conf.check_cfg(package='ChronoSync', args=['ChronoSync >= 0.1', '--cflags', '--libs'],
                    uselib_store='SYNC', mandatory=True)
 
@@ -101,7 +104,7 @@
         features='cxx',
         source=bld.path.ant_glob(['catalog/src/**/*.cpp'],
                                  excl=['catalog/src/main.cpp']),
-        use='NDN_CXX BOOST JSON MYSQL SYNC LOG4CXX',
+        use='NDN_CXX BOOST JSON MYSQL SYNC LOG4CXX ZDB',
         includes='catalog/src .',
         export_includes='catalog/src .'
     )