catalog: implement catalog driver and facade

This commit also refactories the QueryAdapter's code, adds corresponding
unit-test. Catalog-adapter and catalog do not use template, so the definition
are moved to corresponding cpp files.

refs: #2599, #2600

Change-Id: I2be492ec3c2538e865bfa7c09ac8cd49e2a9527d
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
new file mode 100644
index 0000000..ebec2f9
--- /dev/null
+++ b/catalog/src/catalog/catalog.cpp
@@ -0,0 +1,102 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ *  for atmospheric science data
+ *  Copyright (C) 2015 Colorado State University
+ *
+ *  NDN-Atmos is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  NDN-Atmos is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with NDN-Atmos.  If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "catalog.hpp"
+
+namespace atmos {
+namespace catalog {
+
+Catalog::Catalog(const std::shared_ptr<ndn::Face>& face,
+                 const std::shared_ptr<ndn::KeyChain>& keyChain,
+                 const std::string& configFileName)
+  : m_face(face)
+  , m_keyChain(keyChain)
+  , m_configFile(configFileName)
+{
+  // empty
+}
+
+Catalog::~Catalog()
+{
+  // empty
+}
+
+void
+Catalog::onConfig(const util::ConfigSection& configSection,
+                  bool isDryRun,
+                  const std::string& fileName)
+{
+  if (isDryRun) {
+    return;
+  }
+  for (auto i = configSection.begin();
+       i != configSection.end();
+       ++ i)
+  {
+    if (i->first == "prefix") {
+      m_prefix.clear();
+      m_prefix.append(i->second.get_value<std::string>());
+      if (m_prefix.empty()) {
+        throw Error("Empty value for \"prefix\""
+                                 " in \"general\" section");
+      }
+    }
+  }
+}
+
+void
+Catalog::addAdapter(std::unique_ptr<util::CatalogAdapter>& adapter)
+{
+  m_adapters.push_back(std::move(adapter));
+}
+
+void
+Catalog::initializeCatalog()
+{
+  util::ConfigFile config(&util::ConfigFile::ignoreUnknownSection);
+
+  config.addSectionHandler("general", bind(&Catalog::onConfig, this, _1, _2, _3));
+
+  config.parse(m_configFile, true);
+  config.parse(m_configFile, false);
+}
+
+void
+Catalog::initializeAdapters()
+{
+  util::ConfigFile config(&util::ConfigFile::ignoreUnknownSection);
+  for (auto i = m_adapters.begin();
+       i != m_adapters.end();
+       ++ i)
+  {
+    (*i)->setConfigFile(config, m_prefix);
+  }
+
+  config.parse(m_configFile, true);
+  config.parse(m_configFile, false);
+}
+
+void
+Catalog::initialize()
+{
+  initializeCatalog();
+  initializeAdapters();
+}
+
+} // namespace catalog
+} // namespace atmos
diff --git a/catalog/src/catalog/catalog.hpp b/catalog/src/catalog/catalog.hpp
index fe680d9..b58129e 100644
--- a/catalog/src/catalog/catalog.hpp
+++ b/catalog/src/catalog/catalog.hpp
@@ -19,12 +19,10 @@
 #ifndef ATMOS_CATALOG_CATALOG_HPP
 #define ATMOS_CATALOG_CATALOG_HPP
 
-#include "query/query-adapter.hpp"
-#include "publish/publish-adapter.hpp"
+#include "util/catalog-adapter.hpp"
+#include "util/config-file.hpp"
 
-#include <ndn-cxx/data.hpp>
 #include <ndn-cxx/face.hpp>
-#include <ndn-cxx/interest.hpp>
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/security/key-chain.hpp>
 
@@ -36,55 +34,82 @@
 
 /**
  * The Catalog acts as a façade around the Database.
- * It is Templated on a DatabaseHandler: the connection into the database that it will use to
- * communicate with the actual system
  */
-template <typename DatabaseHandler>
 class Catalog {
 public:
+  class Error : public std::runtime_error
+  {
+  public:
+    explicit
+    Error(const std::string& what)
+      : std::runtime_error(what)
+    {
+    }
+  };
+
   /**
    * Constructor
    *
-   * @param aFace:            Face that will be used for NDN communications
-   * @param aKeyChain:        KeyChain to sign query responses and evaluate the incoming publish
-   *                          and ChronoSync requests against
-   * @param aDatabaseHandler: <typename DatabaseHandler> to the database that stores our catalog
-   * @oaram aPrefix:          Name that will define the prefix to all queries and publish requests
-   *                          that will be routed to this specific Catalog Instance
+   * @param face:             Face that will be used for NDN communications
+   * @param keyChain:         KeyChain that will be used for data signing
+   * @param configFileName:   Configuration file that specifies the catalog configuration details
    */
-  Catalog(std::shared_ptr<ndn::Face> aFace, std::shared_ptr<ndn::KeyChain> aKeyChain,
-          std::shared_ptr<DatabaseHandler> aDatabaseHandler, const ndn::Name& aPrefix);
+  Catalog(const std::shared_ptr<ndn::Face>& face,
+          const std::shared_ptr<ndn::KeyChain>& keyChain,
+          const std::string& configFileName);
 
-  /**
-   * Destructor
-   */
   virtual
   ~Catalog();
 
+  /**
+   * Function that performs the initialization of catalog instance and the adapters added in the
+   * catalog. After initialization, face can be started by processEvents()
+   */
+  void
+  initialize();
+
+  /**
+   * Helper function that adds adapters in catalog so that all adapters can be initialized when
+   * the initialize() is called
+   *
+   * @param adapter: Adapter that will be added. Any adapter instances must be declared as the
+   *                 base Class "util::CatalogAdapter"
+   */
+  void
+  addAdapter(std::unique_ptr<util::CatalogAdapter>& adapter);
+
 protected:
-  // Templated Adapter to handle Query requests
-  atmos::query::QueryAdapter<DatabaseHandler> m_queryAdapter;
-  // Templated Adapter to handle Publisher requests
-  atmos::publish::PublishAdapter<DatabaseHandler> m_publishAdapter;
+
+  /**
+   * Helper function that configures the catalog according to the general section
+   */
+  void
+  onConfig(const util::ConfigSection& configSection,
+           bool isDryRun,
+           const std::string& fileName);
+
+  /**
+   * Helper function that subscribes to the general section for the config file
+   */
+  void
+  initializeCatalog();
+
+  /**
+   * Helper function that launches the adapters configuration processing functions
+   */
+  void
+  initializeAdapters();
+
+private:
+  const std::shared_ptr<ndn::Face> m_face;
+  const std::shared_ptr<ndn::KeyChain> m_keyChain;
+  const std::string m_configFile;
+  ndn::Name m_prefix;
+
+  // Adapters that added by users
+  std::vector<std::unique_ptr<util::CatalogAdapter>> m_adapters;
 }; // class Catalog
 
-template <typename DatabaseHandler>
-Catalog<DatabaseHandler>::Catalog(std::shared_ptr<ndn::Face> aFace,
-                                  std::shared_ptr<ndn::KeyChain> aKeyChain,
-                                  std::shared_ptr<DatabaseHandler> aDatabaseHandler,
-                                  const ndn::Name& aPrefix)
-
-  : m_queryAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
-  , m_publishAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
-{
-  // empty
-}
-
-template <typename DatabaseHandler>
-Catalog<DatabaseHandler>::~Catalog()
-{
-  // empty
-}
 
 } // namespace catalog
 } // namespace atmos
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 3f199ee..4c85d8c 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -14,37 +14,66 @@
  *
  *  You should have received a copy of the GNU General Public License
  *  along with NDN-Atmos.  If not, see <http://www.gnu.org/licenses/>.
-**/
+ **/
 
+#include "config.hpp"
 #include "catalog/catalog.hpp"
-#include "util/mysql-util.hpp"
-
-#include <ChronoSync/socket.hpp>
-
-#include <ndn-cxx/data.hpp>
-#include <ndn-cxx/face.hpp>
-#include <ndn-cxx/interest.hpp>
-#include <ndn-cxx/name.hpp>
-#include <ndn-cxx/security/key-chain.hpp>
-
-#include "mysql/mysql.h"
+#include "query/query-adapter.hpp"
+#include "publish/publish-adapter.hpp"
 
 #include <memory>
+#include <getopt.h>
+#include <ndn-cxx/face.hpp>
 
-int main()
+
+void
+usage()
 {
-  std::shared_ptr<chronosync::Socket> socket; // use ChronoSync
+  std::cout << "\n Usage:\n atmos-catalog "
+    "[-h] [-f config file] "
+    "   [-f config file]    - set the configuration file\n"
+    "   [-h]                - print help and exit\n"
+    "\n";
+}
+
+int
+main(int argc, char** argv)
+{
+  int option;
+  std::string configFile(DEFAULT_CONFIG_FILE);
+
+  while ((option = getopt(argc, argv, "f:h")) != -1) {
+    switch (option) {
+      case 'f':
+        configFile.assign(optarg);
+        break;
+      case 'h':
+      default:
+        usage();
+        return 0;
+    }
+  }
+
+  argc -= optind;
+  argv += optind;
+  if (argc != 0) {
+    usage();
+    return 1;
+  }
+
   std::shared_ptr<ndn::Face> face(new ndn::Face());
   std::shared_ptr<ndn::KeyChain> keyChain(new ndn::KeyChain());
 
-  // This should be unique to each instance
-  ndn::Name aName("/catalog/myUniqueName");
+  std::unique_ptr<atmos::util::CatalogAdapter>
+    queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain));
+  std::unique_ptr<atmos::util::CatalogAdapter>
+    publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain));
 
-  atmos::util::ConnectionDetails mysqlID("atmos-den.es.net", "testuser", "test623", "testdb");
-  std::shared_ptr<MYSQL> conn;
-  conn = atmos::util::MySQLConnectionSetup(mysqlID);
+  atmos::catalog::Catalog catalogInstance(face, keyChain, configFile);
+  catalogInstance.addAdapter(queryAdapter);
+  catalogInstance.addAdapter(publishAdapter);
 
-  atmos::catalog::Catalog<MYSQL> catalog(face, keyChain, conn, aName);
+  catalogInstance.initialize();
   face->processEvents();
 
   return 0;
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index c6b322b..9d29434 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -31,50 +31,60 @@
 #include <ndn-cxx/interest-filter.hpp>
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/security/key-chain.hpp>
-
+#include <ndn-cxx/security/validator.hpp>
 #include "mysql/mysql.h"
 
 #include <memory>
 #include <string>
+#include <vector>
+#include <unordered_map>
 
 namespace atmos {
 namespace publish {
-
 /**
  * PublishAdapter handles the Publish usecases for the catalog
  */
 template <typename DatabaseHandler>
-class PublishAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+class PublishAdapter : public atmos::util::CatalogAdapter {
 public:
   /**
    * Constructor
    *
-   * @param face:            Face that will be used for NDN communications
-   * @param keyChain:        KeyChain to sign query responses and evaluate the incoming publish
-   *                          and ChronoSync requests against
-   * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
-   * @oaram prefix:          Name that will define the prefix to all publish requests
-   *                          that will be routed to this specific Catalog Instance
+   * @param face:      Face that will be used for NDN communications
+   * @param keyChain:  KeyChain that will be used for data signing
    */
-  PublishAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-                 std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+  PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+                 const std::shared_ptr<ndn::KeyChain>& keyChain);
 
-
-  /**
-   * Destructor
-   */
   virtual
   ~PublishAdapter();
 
+  /**
+   * Helper function that subscribe to a publish section for the config file
+   */
+  void
+  setConfigFile(util::ConfigFile& config,
+                const ndn::Name& prefix);
+
 protected:
   /**
+   * Helper function that configures piblishAdapter instance according to publish section
+   * in config file
+   */
+  void
+  onConfig(const util::ConfigSection& section,
+           bool isDryDun,
+           const std::string& fileName,
+           const ndn::Name& prefix);
+
+   /**
    * Initial "please publish this" Interests
    *
    * @param filter:   InterestFilter that caused this Interest to be routed
    * @param interest: Interest that needs to be handled
    */
   virtual void
-  onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+  onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
 
   /**
    * Data containing the actual thing we need to publish
@@ -83,53 +93,191 @@
    * @param data:     Data that needs to be handled
    */
   virtual void
-  onData(const ndn::Interest& interest, const ndn::Data& data);
+  onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
 
-  // @todo: Should we do anything special with the timeouts for the publish requests?
-  //        If so, overwrite onTimeout()
+  /**
+   * Helper function to set the DatabaseHandler
+   */
+  void
+  setDatabaseHandler(const util::ConnectionDetails&  databaseId);
 
+  /**
+   * Helper function that sets filters to make the adapter work
+   */
+  void
+  setFilters();
+
+protected:
+  typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
+  // Prefix for ChronoSync
+  ndn::Name m_syncPrefix;
+  // Handle to the Catalog's database
+  std::shared_ptr<DatabaseHandler> m_databaseHandler;
+  std::shared_ptr<ndn::Validator> m_validaor;
+  RegisteredPrefixList m_registeredPrefixList;
 };
 
-template <typename DatabaseHandler>
-PublishAdapter<DatabaseHandler>::PublishAdapter(std::shared_ptr<ndn::Face> face,
-                           std::shared_ptr<ndn::KeyChain> keyChain,
-                           std::shared_ptr<DatabaseHandler> databaseHandler,
-                           const ndn::Name& prefix)
-  : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler,
-  ndn::Name(prefix).append("/publish"))
-{
-  face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("/publish")),
-                                               bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onInterest,
-                                                    this, _1, _2),
-                                               bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
-                                                    this, _1),
-                                               bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
-                                                    this, _1, _2));
 
-  std::shared_ptr<ndn::Interest> request(std::make_shared<ndn::Interest>(ndn::Name(prefix).append("/publish")));
-  atmos::util::CatalogAdapter<DatabaseHandler>::m_face->expressInterest(*request,
-                                                                        bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onData,
-                                                                             this, _1, _2),
-                                                                        bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onTimeout,
-                                                                             this, _1));
+template <typename DatabaseHandler>
+PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+                                                const std::shared_ptr<ndn::KeyChain>& keyChain)
+  : util::CatalogAdapter(face, keyChain)
+{
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setFilters()
+{
+  ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
+  m_registeredPrefixList[publishPrefix] = m_face->setInterestFilter(publishPrefix,
+                                                                    bind(&publish::PublishAdapter<DatabaseHandler>::onPublishInterest,
+                                                                         this, _1, _2),
+                            bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+                                 this, _1),
+                            bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+                                 this, _1, _2));
 }
 
 template <typename DatabaseHandler>
 PublishAdapter<DatabaseHandler>::~PublishAdapter()
 {
-  // empty
+  for (const auto& itr : m_registeredPrefixList) {
+    if (static_cast<bool>(itr.second))
+      m_face->unsetInterestFilter(itr.second);
+  }
 }
 
 template <typename DatabaseHandler>
 void
-PublishAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
+                                               const ndn::Name& prefix)
+{
+  config.addSectionHandler("publishAdapter",
+                           bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
+                                _1, _2, _3, prefix));
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
+                                          bool isDryRun,
+                                          const std::string& filename,
+                                          const ndn::Name& prefix)
+{
+  using namespace util;
+  if (isDryRun) {
+    return;
+  }
+
+  std::string signingId, dbServer, dbName, dbUser, dbPasswd;
+  std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
+
+  for (auto item = section.begin();
+       item != section.end();
+       ++ item)
+  {
+    if (item->first == "signingId") {
+      signingId.assign(item->second.get_value<std::string>());
+      if (signingId.empty()) {
+        throw Error("Invalid value for \"signingId\""
+                                " in \"publish\" section");
+      }
+    }
+
+    // @todo: parse the published_file_security section
+
+    else if (item->first == "database") {
+      const util::ConfigSection& databaseSection = item->second;
+      for (auto subItem = databaseSection.begin();
+           subItem != databaseSection.end();
+           ++ subItem) {
+        if (subItem->first == "dbServer") {
+          dbServer.assign(subItem->second.get_value<std::string>());
+          if (dbServer.empty()){
+            throw Error("Invalid value for \"dbServer\""
+                                    " in \"publish\" section");
+          }
+        }
+        if (subItem->first == "dbName") {
+          dbName.assign(subItem->second.get_value<std::string>());
+          if (dbName.empty()){
+            throw Error("Invalid value for \"dbName\""
+                                    " in \"publish\" section");
+          }
+        }
+        if (subItem->first == "dbUser") {
+          dbUser.assign(subItem->second.get_value<std::string>());
+          if (dbUser.empty()){
+            throw Error("Invalid value for \"dbUser\""
+                                    " in \"publish\" section");
+          }
+        }
+        if (subItem->first == "dbPasswd") {
+          dbPasswd.assign(subItem->second.get_value<std::string>());
+          if (dbPasswd.empty()){
+            throw Error("Invalid value for \"dbPasswd\""
+                                    " in \"publish\" section");
+          }
+        }
+      }
+    }
+    else if (item->first == "sync") {
+      const util::ConfigSection& synSection = item->second;
+      for (auto subItem = synSection.begin();
+           subItem != synSection.end();
+           ++ subItem) {
+        if (subItem->first == "prefix") {
+          syncPrefix.clear();
+          syncPrefix.assign(subItem->second.get_value<std::string>());
+          if (syncPrefix.empty()){
+            throw Error("Invalid value for \"prefix\""
+                                    " in \"publish\\sync\" section");
+          }
+        }
+        // todo: parse the sync_security section
+      }
+    }
+  }
+
+  m_prefix = prefix;
+  m_signingId = ndn::Name(signingId);
+  m_syncPrefix.clear();
+  m_syncPrefix.append(syncPrefix);
+  util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+
+  setDatabaseHandler(mysqlId);
+  setFilters();
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+  //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+  std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+
+  m_databaseHandler = conn;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
+                                                   const ndn::Interest& interest)
 {
   // @todo: Request the data for publish
 }
 
 template <typename DatabaseHandler>
 void
-PublishAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
+PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
+                                                 const ndn::Data& data)
 {
   // @todo handle publishing the data
 }
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index bada073..107e6c4 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -21,10 +21,10 @@
 
 #include "util/catalog-adapter.hpp"
 #include "util/mysql-util.hpp"
+#include "util/config-file.hpp"
 
 #include <thread>
 
-
 #include <json/reader.h>
 #include <json/value.h>
 #include <json/writer.h>
@@ -42,6 +42,7 @@
 #include "mysql/mysql.h"
 
 #include <map>
+#include <unordered_map>
 #include <memory>
 #include <mutex>
 #include <sstream>
@@ -49,35 +50,44 @@
 
 namespace atmos {
 namespace query {
-
 static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
 
 /**
  * QueryAdapter handles the Query usecases for the catalog
  */
 template <typename DatabaseHandler>
-class QueryAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+class QueryAdapter : public atmos::util::CatalogAdapter {
 public:
   /**
    * Constructor
    *
-   * @param face:            Face that will be used for NDN communications
-   * @param keyChain:        KeyChain to sign query responses and evaluate the incoming query
-   *                          and ChronoSync requests against
-   * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
-   * @param prefix:          Name that will define the prefix to all queries requests that will be
-   *                          routed to this specific Catalog Instance
+   * @param face:      Face that will be used for NDN communications
+   * @param keyChain:  KeyChain that will be used for data signing
    */
-  QueryAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-               std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+  QueryAdapter(const std::shared_ptr<ndn::Face>& face,
+               const std::shared_ptr<ndn::KeyChain>& keyChain);
 
-  /**
-   * Destructor
-   */
   virtual
   ~QueryAdapter();
 
   /**
+   * Helper function to specify section handler
+   */
+  void
+  setConfigFile(util::ConfigFile& config,
+                const ndn::Name& prefix);
+
+protected:
+  /**
+   * Helper function for configuration parsing
+   */
+  void
+  onConfig(const util::ConfigSection& section,
+           bool isDryDun,
+           const std::string& fileName,
+           const ndn::Name& prefix);
+
+  /**
    * Handles incoming query requests by stripping the filter off the Interest to get the
    * actual request out. This removes the need for a 2-step Interest-Data retrieval.
    *
@@ -96,67 +106,82 @@
   virtual void
   onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
 
-private:
   /**
-   * Helper function that generates query results
+   * Helper function that makes query-results data
    *
-   * @param face:            Face that will be used for NDN communications
-   * @param keyChain:        KeyChain to sign query responses and evaluate the incoming query
-   *                          and ChronoSync requests against
-   * @param interest:        Interest that needs to be handled      
-   * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
-   */
-  void
-  query(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-        std::shared_ptr<const ndn::Interest> interest,
-        std::shared_ptr<DatabaseHandler> databaseHandler);
-
-  /**
-   * Helper function that publishes JSON
-   *
-   * @param face:          Face that will send the Data out on
-   * @param keyChain:      KeyChain to sign the Data we're creating
-   * @param segmentPrefix: Name that identifies the Prefix for the Data
-   * @param value:         Json::Value to be sent in the Data
-   * @param segmentNo:     uint64_t the segment for this Data
-   * @param isFinalBlock:   bool to indicate whether this needs to be flagged in the Data as the last entry
+   * @param segmentPrefix:  Name that identifies the Prefix for the Data
+   * @param value:          Json::Value to be sent in the Data
+   * @param segmentNo:      uint64_t the segment for this Data
+   * @param isFinalBlock:   bool to indicate whether this needs to be flagged in the Data as the
+   *                         last entry
    * @param isAutocomplete: bool to indicate whether this is an autocomplete message
    */
-  void
-  publishJson(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-              const ndn::Name& segmentPrefix, const Json::Value& value,
-              uint64_t segmentNo, bool isFinalBlock, bool isAutocomplete);
+  std::shared_ptr<ndn::Data>
+  makeReplyData(const ndn::Name& segmentPrefix,
+                const Json::Value& value,
+                uint64_t segmentNo,
+                bool isFinalBlock,
+                bool isAutocomplete);
 
   /**
-   * Helper function that publishes char*
+   * Helper function that generates query results from a Json query carried in the Interest
    *
-   * @param face:          Face that will send the Data out on
-   * @param keyChain:      KeyChain to sign the Data we're creating
-   * @param segmentPrefix: Name that identifies the Prefix for the Data
-   * @param payload:       char* to be sent in the Data
-   * @param payloadLength: size_t to indicate how long payload is
-   * @param segmentNo:     uint64_t the segment for this Data
-   * @param isFinalBlock:   bool to indicate whether this needs to be flagged in the Data as the last entry
+   * @param interest:  Interest that needs to be handled
    */
   void
-  publishSegment(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-                 const ndn::Name& segmentPrefix, const char* payload, size_t payloadLength,
-                 uint64_t segmentNo, bool isFinalBlock);
+  runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
 
   /**
-   * Helper function that generates query results from a Json query
+   * Helper function that makes ACK data
    *
-   * @param face:            Face that will be used for NDN communications
-   * @param jsonQuery:       String containing the JSON query
-   * @param keyChain:        KeyChain to sign query responses and evaluate the incoming query
-   *                          and ChronoSync requests against
-   * @param interest:        Interest that needs to be handled
-   * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+   * @param interest: Intersts that needs to be handled
+   * @param version:  Version that needs to be in the data name
+   */
+  std::shared_ptr<ndn::Data>
+  makeAckData(std::shared_ptr<const ndn::Interest> interest,
+              const ndn::Name::Component& version);
+
+  /**
+   * Helper function that generates the sqlQuery string and autocomplete flag
+   * @param sqlQuery:     stringstream to save the sqlQuery string
+   * @param jsonValue:    Json value that contains the query information
+   * @param autocomplete: Flag to indicate if the json contains autocomplete flag
    */
   void
-  runJsonQuery(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-               std::shared_ptr<const ndn::Interest> interest, const std::string& jsonQuery,
-               std::shared_ptr<DatabaseHandler> databaseHandler);
+  json2Sql(std::stringstream& sqlQuery,
+           Json::Value& jsonValue,
+           bool& autocomplete);
+
+  /**
+   * Helper function that signs the data
+   */
+  void
+  signData(ndn::Data& data);
+
+  /**
+   * Helper function that publishes query-results data segments
+   */
+  virtual void
+  prepareSegments(const ndn::Name& segmentPrefix,
+                  const std::string& sqlString,
+                  bool autocomplete);
+
+  /**
+   * Helper function to set the DatabaseHandler
+   */
+  void
+  setDatabaseHandler(const util::ConnectionDetails&  databaseId);
+
+  /**
+   * Helper function that set filters to make the adapter work
+   */
+  void
+  setFilters();
+
+protected:
+  typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
+  // Handle to the Catalog's database
+  std::shared_ptr<DatabaseHandler> m_databaseHandler;
 
   // mutex to control critical sections
   std::mutex m_mutex;
@@ -166,55 +191,162 @@
 
   ndn::util::InMemoryStorageLru m_cache;
   // @}
+  RegisteredPrefixList m_registeredPrefixList;
 };
 
-
 template <typename DatabaseHandler>
-QueryAdapter<DatabaseHandler>::QueryAdapter(std::shared_ptr<ndn::Face> face,
-                           std::shared_ptr<ndn::KeyChain> keyChain,
-                           std::shared_ptr<DatabaseHandler> databaseHandler,
-                           const ndn::Name& prefix)
-  : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler, prefix)
-  , m_cache(100000000)
+QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
+                                            const std::shared_ptr<ndn::KeyChain>& keyChain)
+  : util::CatalogAdapter(face, keyChain)
+  , m_cache(250000)
 {
-  atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query")),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryInterest,
-                                                                               this, _1, _2),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
-                                                                               this, _1),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
-                                                                               this, _1, _2));
-
-  atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query-results")),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
-                                                                               this, _1, _2),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
-                                                                               this, _1),
-                                                                          bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
-                                                                               this, _1, _2));
-}
-
-template <typename DatabaseHandler>
-QueryAdapter<DatabaseHandler>::~QueryAdapter(){
-  // empty
 }
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::setFilters()
 {
-  // strictly enforce query initialization namespace. Name should be our local prefix + "query" + parameters
+  ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
+  m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
+                                 this, _1, _2),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+                                 this, _1),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+                                 this, _1, _2));
+
+  ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
+  m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
+                                 this, _1, _2),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+                                 this, _1),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+                                 this, _1, _2));
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
+                                             const ndn::Name& prefix)
+{
+  config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
+                                                _1, _2, _3, prefix));
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
+                                        bool isDryRun,
+                                        const std::string& filename,
+                                        const ndn::Name& prefix)
+{
+  using namespace util;
+  if (isDryRun) {
+    return;
+  }
+  std::string signingId, dbServer, dbName, dbUser, dbPasswd;
+  for (auto item = section.begin();
+       item != section.end();
+       ++ item)
+  {
+    if (item->first == "signingId") {
+      signingId.assign(item->second.get_value<std::string>());
+      if (signingId.empty()) {
+        throw Error("Empty value for \"signingId\""
+                                " in \"query\" section");
+      }
+    }
+    if (item->first == "database") {
+      const util::ConfigSection& dataSection = item->second;
+      for (auto subItem = dataSection.begin();
+           subItem != dataSection.end();
+           ++ subItem)
+      {
+        if (subItem->first == "dbServer") {
+          dbServer.assign(subItem->second.get_value<std::string>());
+          if (dbServer.empty()){
+            throw Error("Invalid value for \"dbServer\""
+                                    " in \"query\" section");
+          }
+        }
+        if (subItem->first == "dbName") {
+          dbName.assign(subItem->second.get_value<std::string>());
+          if (dbName.empty()){
+            throw Error("Invalid value for \"dbName\""
+                                    " in \"query\" section");
+          }
+        }
+        if (subItem->first == "dbUser") {
+          dbUser.assign(subItem->second.get_value<std::string>());
+          if (dbUser.empty()){
+            throw Error("Invalid value for \"dbUser\""
+                                    " in \"query\" section");
+          }
+        }
+        if (subItem->first == "dbPasswd") {
+          dbPasswd.assign(subItem->second.get_value<std::string>());
+          if (dbPasswd.empty()){
+            throw Error("Invalid value for \"dbPasswd\""
+                                    " in \"query\" section");
+          }
+        }
+      }
+    }
+  }
+
+  m_prefix = prefix;
+  m_signingId = ndn::Name(signingId);
+  util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+
+  setDatabaseHandler(mysqlId);
+  setFilters();
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+  //empty
+}
+
+template <>
+void
+QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+  std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+
+  m_databaseHandler = conn;
+}
+
+template <typename DatabaseHandler>
+QueryAdapter<DatabaseHandler>::~QueryAdapter()
+{
+  for (const auto& itr : m_registeredPrefixList) {
+    if (static_cast<bool>(itr.second))
+      m_face->unsetInterestFilter(itr.second);
+  }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
+                                               const ndn::Interest& interest)
+{
+  // strictly enforce query initialization namespace.
+  // Name should be our local prefix + "query" + parameters
   if (interest.getName().size() != filter.getPrefix().size() + 1) {
     // @todo: return a nack
     return;
   }
-
   std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-
-  std::thread queryThread(&QueryAdapter<DatabaseHandler>::query, this,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
+  #ifndef NDEBUG
+    std::cout << "query interest : " << interestPtr->getName() << std::endl;
+  #endif
+  // @todo: use thread pool
+  std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
+                          this,
+                          interestPtr);
   queryThread.join();
 }
 
@@ -227,109 +359,100 @@
   // CS so we just ignore any retrieval Interests that hit us for
   // now. In the future, this should check some form of
   // InMemoryStorage.
+  #ifndef NDEBUG
+    std::cout << "query results interest : " << interest.toUri() << std::endl;
+  #endif
   auto data = m_cache.find(interest.getName());
   if (data) {
-    atmos::util::CatalogAdapter<DatabaseHandler>::m_face->put(*data);
-  } else {
-    // regenerate query
-    const std::string jsonQuery(reinterpret_cast<const char*>(interest.getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
-
-    std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-    std::thread queryRegenThread(&QueryAdapter<DatabaseHandler>::runJsonQuery, this,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
-                          jsonQuery,
-                          atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
-    queryRegenThread.join();
+    m_face->put(*data);
   }
 }
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::publishJson(std::shared_ptr<ndn::Face> face,
-                                              std::shared_ptr<ndn::KeyChain> keyChain,
-                                              const ndn::Name& segmentPrefix,
-                                              const Json::Value& value,
-                                              uint64_t segmentNo, bool isFinalBlock,
-                                              bool isAutocomplete)
+QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
 {
-  Json::Value entry;
-  Json::FastWriter fastWriter;
-  if (isAutocomplete) {
-    entry["next"] = value;
-  } else {
-    entry["results"] = value;
+  if (m_signingId.empty())
+    m_keyChain->sign(data);
+  else {
+    ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+    ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
+    m_keyChain->sign(data, certName);
   }
-  const std::string jsonMessage = fastWriter.write(entry);
-  publishSegment(face, keyChain, segmentPrefix, jsonMessage.c_str(), jsonMessage.size() + 1,
-                 segmentNo, isFinalBlock);
 }
 
 template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::publishSegment(std::shared_ptr<ndn::Face> face,
-                                              std::shared_ptr<ndn::KeyChain> keyChain,
-                                              const ndn::Name& segmentPrefix,
-                                              const char* payload, size_t payloadLength,
-                                              uint64_t segmentNo, bool isFinalBlock)
+std::shared_ptr<ndn::Data>
+QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
+                                           const ndn::Name::Component& version)
 {
-  ndn::Name segmentName(segmentPrefix);
-  if (isFinalBlock) {
-    segmentName.append("END");
-  } else {
-    segmentName.appendSegment(segmentNo);
-  }
+  // JSON parsed ok, so we can acknowledge successful receipt of the query
+  ndn::Name ackName(interest->getName());
+  ackName.append(version);
+  ackName.append("OK");
 
-  std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
-  data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
-  data->setFreshnessPeriod(ndn::time::milliseconds(10000));
-
-  if (isFinalBlock) {
-    data->setFinalBlockId(segmentName[-1]);
-  }
-  keyChain->sign(*data);
-  //face->put(*data);
-
-  m_mutex.lock();
-  m_cache.insert(*data);
-  m_mutex.unlock();
+  std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+  signData(*ack);
+  #ifndef NDEBUG
+    std::cout << "makeAckData : " << ackName << std::endl;
+  #endif
+  return ack;
 }
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::query(std::shared_ptr<ndn::Face> face,
-                                     std::shared_ptr<ndn::KeyChain> keyChain,
-                                     std::shared_ptr<const ndn::Interest> interest,
-                                     std::shared_ptr<DatabaseHandler> databaseHandler)
+QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
+                                        Json::Value& jsonValue,
+                                        bool& autocomplete)
+{
+  // 3) Convert the JSON Query into a MySQL one
+  sqlQuery << "SELECT name FROM cmip5";
+  bool input = false;
+  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+  {
+    Json::Value key = iter.key();
+    Json::Value value = (*iter);
+
+    if (input) {
+      sqlQuery << " AND";
+    } else {
+      sqlQuery << " WHERE";
+    }
+
+    // Auto-complete case
+    if (key.asString().compare("?") == 0) {
+      sqlQuery << " name REGEXP '^" << value.asString() << "'";
+      autocomplete = true;
+    }
+    // Component case
+    else {
+      sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
+    }
+    input = true;
+  }
+
+  if (!input) { // Force it to be the empty set
+    sqlQuery << " limit 0";
+  }
+  sqlQuery << ";";
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
 {
   // 1) Strip the prefix off the ndn::Interest's ndn::Name
   // +1 to grab JSON component after "query" component
-  const std::string jsonQuery(reinterpret_cast<const char*>(interest->getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
-  if (jsonQuery.length() > 0) {
-    runJsonQuery(face, keyChain, interest, jsonQuery, databaseHandler);
-  } // else NACK?
-}
 
-template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<ndn::Face> face,
-                                            std::shared_ptr<ndn::KeyChain> keyChain,
-					    std::shared_ptr<const ndn::Interest> interest,
-                                            const std::string& jsonQuery,
-                                            std::shared_ptr<DatabaseHandler> databaseHandler)
-{
-  // @todo: we should return a NACK as we have no database
-}
+  ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
+  // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
+  const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
 
-
-template <>
-void
-QueryAdapter<MYSQL>::runJsonQuery(std::shared_ptr<ndn::Face> face,
-                                  std::shared_ptr<ndn::KeyChain> keyChain,
-                                  std::shared_ptr<const ndn::Interest> interest,
-                                  const std::string& jsonQuery,
-                                  std::shared_ptr<MYSQL> databaseHandler)
-{
+  if (jsonQuery.length() <= 0) {
+    // send Nack?
+    return;
+  }
+  // ------------------
   // For efficiency, do a double check. Once without the lock, then with it.
   if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
     m_mutex.lock();
@@ -338,7 +461,7 @@
       // An unusual race-condition case, which requires things like PIT aggregation to be off.
       auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
       if (iter != m_activeQueryToFirstResponse.end()) {
-        face->put(*(iter->second));
+        m_face->put(*(iter->second));
         m_mutex.unlock(); //escape lock
         return;
       }
@@ -349,93 +472,147 @@
   // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
   Json::Value parsedFromString;
   Json::Reader reader;
-  if (reader.parse(jsonQuery, parsedFromString)) {
-    const ndn::name::Component version = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
-
-    // JSON parsed ok, so we can acknowledge successful receipt of the query
-    ndn::Name ackName(interest->getName());
-    ackName.append(version);
-    ackName.append("OK");
-
-    std::shared_ptr<ndn::Data> ack(std::make_shared<ndn::Data>(ackName));
-
-    m_mutex.lock();
-    { // !!! BEGIN CRITICAL SECTION !!!
-      // An unusual race-condition case, which requires things like PIT aggregation to be off.
-      auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
-      if (iter != m_activeQueryToFirstResponse.end()) {
-        face->put(*(iter->second));
-        m_mutex.unlock(); // escape lock
-        return;
-      }
-      // This is where things are expensive so we save them for the lock
-      keyChain->sign(*ack);
-      face->put(*ack);
-      m_activeQueryToFirstResponse.insert(std::pair<std::string,
-                                                    std::shared_ptr<ndn::Data>>(jsonQuery, ack));
-    } // !!!  END  CRITICAL SECTION !!!
-    m_mutex.unlock();
-
-    // 3) Convert the JSON Query into a MySQL one
-    bool autocomplete = false;
-    std::stringstream mysqlQuery;
-    mysqlQuery << "SELECT name FROM cmip5";
-    bool input = false;
-    for (Json::Value::iterator iter = parsedFromString.begin(); iter != parsedFromString.end(); ++iter)
-    {
-      Json::Value key = iter.key();
-      Json::Value value = (*iter);
-
-      if (input) {
-        mysqlQuery << " AND";
-      } else {
-        mysqlQuery << " WHERE";
-      }
-
-      // Auto-complete case
-      if (key.asString().compare("?") == 0) {
-        mysqlQuery << " name REGEXP '^" << value.asString() << "'";
-        autocomplete = true;
-      }
-      // Component case
-      else {
-        mysqlQuery << " " << key.asString() << "='" << value.asString() << "'";
-      }
-      input = true;
-    }
-
-    if (!input) { // Force it to be the empty set
-      mysqlQuery << " limit 0";
-    }
-    mysqlQuery << ";";
-
-    // 4) Run the Query
-    // We're assuming that databaseHandler has already been connected to the database
-    std::shared_ptr<MYSQL_RES> results = atmos::util::PerformQuery(databaseHandler, mysqlQuery.str());
-
-    MYSQL_ROW row;
-    ndn::Name segmentPrefix(atmos::util::CatalogAdapter<MYSQL>::m_prefix);
-    segmentPrefix.append("query-results");
-    segmentPrefix.append(version);
-
-    size_t usedBytes = 0;
-    const size_t PAYLOAD_LIMIT = 7000;
-    uint64_t segmentNo = 0;
-    Json::Value array;
-    while ((row = mysql_fetch_row(results.get())))
-    {
-      size_t size = strlen(row[0]) + 1;
-      if (usedBytes + size > PAYLOAD_LIMIT) {
-        publishJson(face, keyChain, segmentPrefix, array, segmentNo, false, autocomplete);
-        array.clear();
-        usedBytes = 0;
-        segmentNo++;
-      }
-      array.append(row[0]);
-      usedBytes += size;
-    }
-    publishJson(face, keyChain, segmentPrefix, array, segmentNo, true, autocomplete);
+  if (!reader.parse(jsonQuery, parsedFromString)) {
+    // @todo: send NACK?
+    std::cout << "cannot parse the JsonQuery" << std::endl;
+    return;
   }
+
+  const ndn::name::Component version
+    = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
+                                          ndn::time::system_clock::now()).count());
+
+  std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
+
+  m_mutex.lock();
+  { // !!! BEGIN CRITICAL SECTION !!!
+    // An unusual race-condition case, which requires things like PIT aggregation to be off.
+    auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
+    if (iter != m_activeQueryToFirstResponse.end()) {
+      m_face->put(*(iter->second));
+      m_mutex.unlock(); // escape lock
+      return;
+    }
+    // This is where things are expensive so we save them for the lock
+    m_activeQueryToFirstResponse.insert(std::pair<std::string,
+                                        std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+    m_face->put(*ack);
+  } // !!!  END  CRITICAL SECTION !!!
+  m_mutex.unlock();
+
+  // 3) Convert the JSON Query into a MySQL one
+  bool autocomplete = false;
+  std::stringstream sqlQuery;
+  json2Sql(sqlQuery, parsedFromString, autocomplete);
+
+  // 4) Run the Query
+  ndn::Name segmentPrefix(m_prefix);
+  segmentPrefix.append("query-results");
+  segmentPrefix.append(version);
+
+  prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
+                                               const std::string& sqlString,
+                                               bool autocomplete)
+{
+  // empty
+}
+
+// prepareSegments specilization function
+template<>
+void
+QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
+                                     const std::string& sqlString,
+                                     bool autocomplete)
+{
+#ifndef NDEBUG
+  std::cout << "sqlString in prepareSegments : " << sqlString << std::endl;
+#endif
+  // 4) Run the Query
+  std::shared_ptr<MYSQL_RES> results
+    = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
+
+  if (!results) {
+#ifndef NDEBUG
+    std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
+#endif
+    // @todo: throw runtime error or log the error message?
+    return;
+  }
+
+#ifndef NDEBUG
+  std::cout << "Query results for \""
+            << sqlString
+            << "\" contain "
+            << mysql_num_rows(results.get())
+            << " rows" << std::endl;
+#endif
+
+  MYSQL_ROW row;
+  size_t usedBytes = 0;
+  const size_t PAYLOAD_LIMIT = 7000;
+  uint64_t segmentNo = 0;
+  Json::Value array;
+  while ((row = mysql_fetch_row(results.get())))
+  {
+    size_t size = strlen(row[0]) + 1;
+    if (usedBytes + size > PAYLOAD_LIMIT) {
+      std::shared_ptr<ndn::Data> data
+        = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete);
+      m_mutex.lock();
+      m_cache.insert(*data);
+      m_mutex.unlock();
+      array.clear();
+      usedBytes = 0;
+      segmentNo++;
+    }
+    array.append(row[0]);
+    usedBytes += size;
+  }
+  std::shared_ptr<ndn::Data> data
+    = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete);
+  m_mutex.lock();
+  m_cache.insert(*data);
+  m_mutex.unlock();
+}
+
+template <typename DatabaseHandler>
+std::shared_ptr<ndn::Data>
+QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
+                                             const Json::Value& value,
+                                             uint64_t segmentNo,
+                                             bool isFinalBlock,
+                                             bool isAutocomplete)
+{
+  Json::Value entry;
+  Json::FastWriter fastWriter;
+  if (isAutocomplete) {
+    entry["next"] = value;
+  } else {
+    entry["results"] = value;
+  }
+  const std::string jsonMessage = fastWriter.write(entry);
+  const char* payload = jsonMessage.c_str();
+  size_t payloadLength = jsonMessage.size() + 1;
+  ndn::Name segmentName(segmentPrefix);
+  segmentName.appendSegment(segmentNo);
+
+  std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+  data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
+  data->setFreshnessPeriod(ndn::time::milliseconds(10000));
+
+  if (isFinalBlock) {
+    data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
+  }
+#ifndef NDEBUG
+  std::cout << "makeReplyData : " << segmentName << std::endl;
+#endif
+  signData(*data);
+  return data;
 }
 
 } // namespace query
diff --git a/catalog/src/util/catalog-adapter.cpp b/catalog/src/util/catalog-adapter.cpp
new file mode 100644
index 0000000..61404ad
--- /dev/null
+++ b/catalog/src/util/catalog-adapter.cpp
@@ -0,0 +1,57 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ *  for atmospheric science data
+ *  Copyright (C) 2015 Colorado State University
+ *
+ *  NDN-Atmos is free software: you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation, either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  NDN-Atmos is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with NDN-Atmos.  If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "catalog-adapter.hpp"
+
+namespace atmos {
+namespace util {
+
+CatalogAdapter::CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
+                               const std::shared_ptr<ndn::KeyChain>& keyChain)
+  : m_face(face)
+  , m_keyChain(keyChain)
+{
+  // empty
+}
+
+CatalogAdapter::~CatalogAdapter()
+{
+  // empty
+}
+
+void
+CatalogAdapter::onRegisterSuccess(const ndn::Name& prefix)
+{
+  // std::cout << "Successfully registered " << prefix << std::endl;
+}
+
+void
+CatalogAdapter::onRegisterFailure(const ndn::Name& prefix, const std::string& reason)
+{
+  throw Error("Failed to register prefix " + prefix.toUri() + " : " + reason);
+}
+
+void
+CatalogAdapter::onTimeout(const ndn::Interest& interest)
+{
+  // At this point, probably should do a retry
+}
+
+} // namespace util
+} // namespace atmos
+
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 52f21d2..2b03358 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -24,14 +24,14 @@
 #include <ndn-cxx/interest.hpp>
 #include <ndn-cxx/name.hpp>
 #include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/encoding/block.hpp>
 
 #include <memory>
 #include <string>
 
-
-#include <ndn-cxx/encoding/block.hpp>
-
 #include <iostream>
+#include "util/config-file.hpp"
+
 
 namespace atmos {
 namespace util {
@@ -42,39 +42,50 @@
  * Both QueryAdapter and PublisherAdapter use this as a template to allow consistancy between
  * their designs and flow-control
  */
-template <typename DatabaseHandler>
 class CatalogAdapter {
 public:
-  /**
-   * Constructor
-   * @param face:            Face that will be used for NDN communications
-   * @param keyChain:        KeyChain to sign query responses and evaluate the incoming publish
-   *                          and ChronoSync requests against
-   * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
-   * @oaram prefix:          Name that will define the prefix to all queries and publish requests
-   *                          that will be routed to this specific Catalog Instance
-   */
-  CatalogAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
-          std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+  class Error : public std::runtime_error
+  {
+  public:
+    explicit
+    Error(const std::string& what)
+      : std::runtime_error(what)
+    {
+    }
+  };
 
   /**
-   * Destructor
+   * Constructor
+   * @param face:      Face that will be used for NDN communications
+   * @param keyChain:  KeyChain that will be used for data signing
    */
+  CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
+                 const std::shared_ptr<ndn::KeyChain>& keyChain);
+
   virtual
   ~CatalogAdapter();
 
-protected:
-  // @{ (onData and onTimeout) and/or onInterest should be overwritten at a minimum
-
-
   /**
-   * Data that is routed to this class based on the Interest
-   *
-   * @param interest: Interest that caused this Data to be routed
-   * @param data:     Data that needs to be handled
+   * Helper function that sets the configuration section handler
+   * @param config: ConfigFile object to set the handler
+   * @param prefix: Catalog prefix
    */
   virtual void
-  onData(const ndn::Interest& interest, const ndn::Data& data);
+  setConfigFile(util::ConfigFile& config,
+                const ndn::Name& prefix) = 0;
+
+protected:
+
+  /**
+   * Callback function that handles the section parsing jobs
+   */
+  virtual void
+  onConfig(const util::ConfigSection& section,
+           bool isDryDun,
+           const std::string& fileName,
+           const ndn::Name& prefix) = 0;
+
+  // @{ (onData and onTimeout) and/or onInterest should be overwritten at a minimum
 
   /**
    * Timeout from a Data request
@@ -84,17 +95,6 @@
   virtual void
   onTimeout(const ndn::Interest& interest);
 
-
-  /**
-   * Interest that is routed to this class based on the InterestFilter
-   *
-   * @param filter:   InterestFilter that caused this Interest to be routed
-   * @param interest: Interest that needs to be handled
-   */
-  virtual void
-  onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
-  // @}
-
   /**
    * Callback that should/can be used to evaluate that the Interest Filter has been correctly set up
    *
@@ -112,71 +112,16 @@
   virtual void
   onRegisterFailure(const ndn::Name& prefix, const std::string& reason);
 
-
+protected:
   // Face to communicate with
-  std::shared_ptr<ndn::Face> m_face;
-  // KeyChain used for security
-  std::shared_ptr<ndn::KeyChain> m_keyChain;
-  // Handle to the Catalog's database
-  std::shared_ptr<DatabaseHandler> m_databaseHandler;
-  // Prefix for our namespace
+  const std::shared_ptr<ndn::Face> m_face;
+  // KeyChain used for data signing
+  const std::shared_ptr<ndn::KeyChain> m_keyChain;
   ndn::Name m_prefix;
+  // Name for the signing key
+  ndn::Name m_signingId;
 }; // class CatalogAdapter
 
-template <typename DatabaseHandler>
-CatalogAdapter<DatabaseHandler>::CatalogAdapter(std::shared_ptr<ndn::Face> face,
-                                                std::shared_ptr<ndn::KeyChain> keyChain,
-                                                std::shared_ptr<DatabaseHandler> databaseHandler,
-                                                const ndn::Name& prefix)
-  : m_face(face), m_keyChain(keyChain), m_databaseHandler(databaseHandler), m_prefix(prefix)
-{
-  // empty
-}
-
-template <typename DatabaseHandler>
-CatalogAdapter<DatabaseHandler>::~CatalogAdapter()
-{
-  // empty
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onRegisterSuccess(const ndn::Name& prefix)
-{
-  // std::cout << "Successfully registered " << prefix << std::endl;
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onRegisterFailure(const ndn::Name& prefix, const std::string& reason)
-{
-  // std::cout << "Failed to register prefix " << prefix << ": " << reason << std::endl;
-}
-
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
-{
-  // At this point we need to get the ndn::Block out of data.getContent()
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
-{
-  // At this point we need to use the filter to either:
-  // a) Request the Data for the Interest, or
-  // b) Use the Filter to ID where in the Interest the Interest's "Content" is, and grab that out
-}
-
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
-{
-  // At this point, probably should do a retry
-}
 
 } // namespace util
 } // namespace atmos
diff --git a/catalog/src/util/config-file.cpp b/catalog/src/util/config-file.cpp
new file mode 100644
index 0000000..0b335f9
--- /dev/null
+++ b/catalog/src/util/config-file.cpp
@@ -0,0 +1,140 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014  Regents of the University of California,
+ *                     Arizona Board of Regents,
+ *                     Colorado State University,
+ *                     University Pierre & Marie Curie, Sorbonne University,
+ *                     Washington University in St. Louis,
+ *                     Beijing Institute of Technology
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "config-file.hpp"
+
+#include <boost/property_tree/info_parser.hpp>
+#include <fstream>
+namespace atmos {
+namespace util {
+
+void
+ConfigFile::throwErrorOnUnknownSection(const std::string& filename,
+                                       const std::string& sectionName,
+                                       const ConfigSection& section,
+                                       bool isDryRun)
+{
+  std::string msg = "Error processing configuration file ";
+  msg += filename;
+  msg += ": no module subscribed for section \"" + sectionName + "\"";
+
+  throw ConfigFile::Error(msg);
+}
+
+void
+ConfigFile::ignoreUnknownSection(const std::string& filename,
+                                 const std::string& sectionName,
+                                 const ConfigSection& section,
+                                 bool isDryRun)
+{
+  // do nothing
+}
+
+ConfigFile::ConfigFile(UnknownConfigSectionHandler unknownSectionCallback)
+  : m_unknownSectionCallback(unknownSectionCallback)
+{
+}
+
+void
+ConfigFile::addSectionHandler(const std::string& sectionName,
+                              ConfigSectionHandler subscriber)
+{
+  m_subscriptions[sectionName] = subscriber;
+}
+
+void
+ConfigFile::parse(const std::string& filename, bool isDryRun)
+{
+  std::ifstream inputFile;
+  inputFile.open(filename.c_str());
+  if (!inputFile.good() || !inputFile.is_open())
+    {
+      std::string msg = "Failed to read configuration file: ";
+      msg += filename;
+      throw Error(msg);
+    }
+  parse(inputFile, isDryRun, filename);
+  inputFile.close();
+}
+
+void
+ConfigFile::parse(const std::string& input, bool isDryRun, const std::string& filename)
+{
+  std::istringstream inputStream(input);
+  parse(inputStream, isDryRun, filename);
+}
+
+
+void
+ConfigFile::parse(std::istream& input, bool isDryRun, const std::string& filename)
+{
+  try
+    {
+      boost::property_tree::read_info(input, m_global);
+    }
+  catch (const boost::property_tree::info_parser_error& error)
+    {
+      std::stringstream msg;
+      msg << "Failed to parse configuration file";
+      msg << " " << filename;
+      msg << " " << error.message() << " line " << error.line();
+      throw Error(msg.str());
+    }
+
+  process(isDryRun, filename);
+}
+
+void
+ConfigFile::process(bool isDryRun, const std::string& filename)
+{
+  BOOST_ASSERT(!filename.empty());
+
+  if (m_global.begin() == m_global.end())
+    {
+      std::string msg = "Error processing configuration file: ";
+      msg += filename;
+      msg += " no data";
+      throw Error(msg);
+    }
+
+  for (ConfigSection::const_iterator i = m_global.begin(); i != m_global.end(); ++i)
+    {
+      const std::string& sectionName = i->first;
+      const ConfigSection& section = i->second;
+
+      SubscriptionTable::iterator subscriberIt = m_subscriptions.find(sectionName);
+      if (subscriberIt != m_subscriptions.end())
+        {
+          ConfigSectionHandler subscriber = subscriberIt->second;
+          subscriber(section, isDryRun, filename);
+        }
+      else
+        {
+          m_unknownSectionCallback(filename, sectionName, section, isDryRun);
+        }
+    }
+}
+
+} // util
+} // atmos
diff --git a/catalog/src/util/config-file.hpp b/catalog/src/util/config-file.hpp
new file mode 100644
index 0000000..6ce292a
--- /dev/null
+++ b/catalog/src/util/config-file.hpp
@@ -0,0 +1,128 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014  Regents of the University of California,
+ *                     Arizona Board of Regents,
+ *                     Colorado State University,
+ *                     University Pierre & Marie Curie, Sorbonne University,
+ *                     Washington University in St. Louis,
+ *                     Beijing Institute of Technology
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef ATMOS_CATALOG_CONFIG_FILE_HPP
+#define ATMOS_CATALOG_CONFIG_FILE_HPP
+
+#include <boost/property_tree/ptree.hpp>
+#include <map>
+
+namespace atmos {
+namespace util{
+
+typedef boost::property_tree::ptree ConfigSection;
+
+/// \brief callback for config file sections
+typedef std::function<void(const ConfigSection& /*section*/,
+                             bool /*isDryRun*/,
+                             const std::string& /*filename*/)> ConfigSectionHandler;
+
+/// \brief callback for config file sections without a subscribed handler
+typedef std::function<void(const std::string& /*filename*/,
+                             const std::string& /*sectionName*/,
+                             const ConfigSection& /*section*/,
+                             bool /*isDryRun*/)> UnknownConfigSectionHandler;
+
+class ConfigFile : boost::noncopyable
+{
+public:
+
+  class Error : public std::runtime_error
+  {
+  public:
+    explicit
+    Error(const std::string& what)
+      : std::runtime_error(what)
+    {
+
+    }
+  };
+
+  ConfigFile(UnknownConfigSectionHandler unknownSectionCallback = throwErrorOnUnknownSection);
+
+  static void
+  throwErrorOnUnknownSection(const std::string& filename,
+                             const std::string& sectionName,
+                             const ConfigSection& section,
+                             bool isDryRun);
+
+  static void
+  ignoreUnknownSection(const std::string& filename,
+                       const std::string& sectionName,
+                       const ConfigSection& section,
+                       bool isDryRun);
+
+  /// \brief setup notification of configuration file sections
+  void
+  addSectionHandler(const std::string& sectionName,
+                    ConfigSectionHandler subscriber);
+
+
+  /**
+   * \param filename file to parse
+   * \param isDryRun true if performing a dry run of configuration, false otherwise
+   * \throws ConfigFile::Error if file not found
+   * \throws ConfigFile::Error if parse error
+   */
+  void
+  parse(const std::string& filename, bool isDryRun);
+
+  /**
+   * \param input configuration (as a string) to parse
+   * \param isDryRun true if performing a dry run of configuration, false otherwise
+   * \param filename optional convenience argument to provide more detailed error messages
+   * \throws ConfigFile::Error if file not found
+   * \throws ConfigFile::Error if parse error
+   */
+  void
+  parse(const std::string& input, bool isDryRun, const std::string& filename);
+
+  /**
+   * \param input stream to parse
+   * \param isDryRun true if performing a dry run of configuration, false otherwise
+   * \param filename optional convenience argument to provide more detailed error messages
+   * \throws ConfigFile::Error if parse error
+   */
+  void
+  parse(std::istream& input, bool isDryRun, const std::string& filename);
+
+private:
+
+  void
+  process(bool isDryRun, const std::string& filename);
+
+private:
+  UnknownConfigSectionHandler m_unknownSectionCallback;
+
+  typedef std::map<std::string, ConfigSectionHandler> SubscriptionTable;
+
+  SubscriptionTable m_subscriptions;
+
+  ConfigSection m_global;
+};
+} // namespace util
+} // namespace atmos
+
+
+#endif // ATMOS_CATALOG_CONFIG_FILE_HPP
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
index 62225ba..fb58665 100644
--- a/catalog/src/util/mysql-util.cpp
+++ b/catalog/src/util/mysql-util.cpp
@@ -17,8 +17,8 @@
 **/
 
 #include "util/mysql-util.hpp"
-
-#include "mysql/errmsg.h"
+#include <mysql/errmsg.h>
+#include <stdexcept>
 
 namespace atmos {
 namespace util {
@@ -32,18 +32,18 @@
 
 
 std::shared_ptr<MYSQL>
-MySQLConnectionSetup(ConnectionDetails& details) {
+MySQLConnectionSetup(const ConnectionDetails& details) {
   MYSQL* conn = mysql_init(NULL);
-  mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
-                     details.password.c_str(), details.database.c_str(), 0, NULL, 0);
-  std::shared_ptr<MYSQL> connection(conn);
+  if(!mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
+                        details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
+    throw std::runtime_error(mysql_error(conn));
+  }
+  std::shared_ptr<MYSQL> connection(conn, &mysql_close);
   return connection;
 }
 
 std::shared_ptr<MYSQL_RES>
-PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
-  std::shared_ptr<MYSQL_RES> result(NULL);
-
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
   switch (mysql_query(connection.get(), sql_query.c_str()))
   {
     case 0:
@@ -51,7 +51,7 @@
       MYSQL_RES* resultPtr = mysql_store_result(connection.get());
       if (resultPtr != NULL)
       {
-        result.reset(resultPtr);
+        return std::shared_ptr<MYSQL_RES>(resultPtr, &mysql_free_result);
       }
       break;
     }
@@ -63,7 +63,7 @@
     default:
       break;
   }
-  return result;
+  return nullptr;
 }
 
 } // namespace util
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
index da3c0b5..ee45f97 100644
--- a/catalog/src/util/mysql-util.hpp
+++ b/catalog/src/util/mysql-util.hpp
@@ -38,10 +38,10 @@
 };
 
 std::shared_ptr<MYSQL>
-MySQLConnectionSetup(ConnectionDetails& details);
+MySQLConnectionSetup(const ConnectionDetails& details);
 
 std::shared_ptr<MYSQL_RES>
-PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
 
 } // namespace util
 } // namespace atmos