/** NDN-Atmos: Cataloging Service for distributed data originally developed
 *  for atmospheric science data
 *  Copyright (C) 2015 Colorado State University
 *
 *  NDN-Atmos is free software: you can redistribute it and/or modify
 *  it under the terms of the GNU General Public License as published by
 *  the Free Software Foundation, either version 3 of the License, or
 *  (at your option) any later version.
 *
 *  NDN-Atmos is distributed in the hope that it will be useful,
 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 *  GNU General Public License for more details.
 *
 *  You should have received a copy of the GNU General Public License
 *  along with NDN-Atmos.  If not, see <http://www.gnu.org/licenses/>.
**/

#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
#define ATMOS_QUERY_QUERY_ADAPTER_HPP

#include "util/catalog-adapter.hpp"
#include "util/mysql-util.hpp"
#include "util/config-file.hpp"

#include <thread>

#include <json/reader.h>
#include <json/value.h>
#include <json/writer.h>

#include <ndn-cxx/data.hpp>
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/interest.hpp>
#include <ndn-cxx/interest-filter.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/util/time.hpp>
#include <ndn-cxx/encoding/encoding-buffer.hpp>
#include <ndn-cxx/util/in-memory-storage-lru.hpp>
#include <ndn-cxx/util/string-helper.hpp>
#include <ChronoSync/socket.hpp>

#include "mysql/mysql.h"

#include <map>
#include <unordered_map>
#include <memory>
#include <mutex>
#include <sstream>
#include <string>
#include <array>
#include <utility>

#include "util/logger.hpp"


namespace atmos {
namespace query {
#ifdef HAVE_LOG4CXX
  INIT_LOGGER("QueryAdapter");
#endif

// todo: calculate payload limit by get the size of a signed empty Data packet
static const size_t PAYLOAD_LIMIT = 7000;

/**
 * QueryAdapter handles the Query usecases for the catalog
 */
template <typename DatabaseHandler>
class QueryAdapter : public atmos::util::CatalogAdapter {
public:
  /**
   * Constructor
   *
   * @param face:       Face that will be used for NDN communications
   * @param keyChain:   KeyChain that will be used for data signing
   * @param syncSocket: ChronoSync socket
   */
  QueryAdapter(const std::shared_ptr<ndn::Face>& face,
               const std::shared_ptr<ndn::KeyChain>& keyChain,
               const std::shared_ptr<chronosync::Socket>& syncSocket);

  virtual
  ~QueryAdapter();

  /**
   * Helper function to specify section handler
   */
  void
  setConfigFile(util::ConfigFile& config,
                const ndn::Name& prefix,
                const std::vector<std::string>& nameFields,
                const std::string& databaseTable);

protected:
  /**
   * Helper function for configuration parsing
   */
  void
  onConfig(const util::ConfigSection& section,
           bool isDryDun,
           const std::string& fileName,
           const ndn::Name& prefix);

  /**
   * Handles incoming query requests by stripping the filter off the Interest to get the
   * actual request out. This removes the need for a 2-step Interest-Data retrieval.
   *
   * @param filter:   InterestFilter that caused this Interest to be routed
   * @param interest: Interest that needs to be handled
   */
  virtual void
  onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);

  /**
   * Handles requests for responses to an filter initialization request
   *
   * @param interest: Interest that needs to be handled
   */
  virtual void
  onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);

  /**
   * Helper function that generates query results from a Json query carried in the Interest
   *
   * @param interest:  Interest that needs to be handled
   */
  void
  populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);

  void
  getFiltersMenu(Json::Value& value);

  /**
   * Helper function that makes query-results data
   *
   * @param segmentPrefix:  Name that identifies the Prefix for the Data
   * @param value:          Json::Value to be sent in the Data
   * @param segmentNo:      uint64_t the segment for this Data
   * @param isFinalBlock:   bool to indicate whether this needs to be flagged in the Data as the
   *                         last entry
   * @param isAutocomplete: bool to indicate whether this is an autocomplete message
   * @param resultCount:    the number of records in the query results
   * @param viewStart:      the start index of the record in the query results payload
   * @param viewEnd:        the end index of the record in the query results payload
   * @param lastComponent:  flag to indicate the content contains the last component for
                            autocompletion query
   */
  std::shared_ptr<ndn::Data>
  makeReplyData(const ndn::Name& segmentPrefix,
                const Json::Value& value,
                uint64_t segmentNo,
                bool isFinalBlock,
                bool isAutocomplete,
                uint64_t resultCount,
                uint64_t viewStart,
                uint64_t viewEnd,
                bool lastComponent);

  /**
   * Helper function that generates query results from a Json query carried in the Interest
   *
   * @param interest:  Interest that needs to be handled
   */
  void
  runJsonQuery(std::shared_ptr<const ndn::Interest> interest);

  /**
   * Helper function that makes ACK data
   *
   * @param interest: Intersts that needs to be handled
   * @param version:  Version that needs to be in the data name
   */
  std::shared_ptr<ndn::Data>
  makeAckData(std::shared_ptr<const ndn::Interest> interest,
              const ndn::Name::Component& version);

  /**
   * Helper function that sends NACK
   *
   * @param dataPrefix: prefix for the data packet
   */
  void
  sendNack(const ndn::Name& dataPrefix);

  /**
   * Helper function that signs the data
   */
  void
  signData(ndn::Data& data);

  /**
   * Helper function that publishes query-results data segments
   */
  virtual void
  prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
                             const std::string& sqlString,
                             bool lastComponent,
                             const std::string& nameField);

  virtual void
  prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
                          const ndn::Name& segmentPrefix);

  void
  generateSegments(ResultSet_T& res,
                   const ndn::Name& segmentPrefix,
                   int resultCount,
                   bool autocomplete,
                   bool lastComponent);

  /**
   * Helper function to set the DatabaseHandler
   */
  void
  setDatabaseHandler(const util::ConnectionDetails&  databaseId);

  void
  closeDatabaseHandler();

  /**
   * Helper function that set filters to make the adapter work
   */
  void
  setFilters();

  void
  setCatalogId();

  /**
   * Helper function that generates the sqlQuery string for autocomplete query
   * @param sqlQuery:      stringstream to save the sqlQuery string
   * @param jsonValue:     Json value that contains the query information
   * @param lastComponent: Flag to mark the last component query
   * @param nameField:     stringstream to save the nameField string
   */
  bool
  json2AutocompletionSql(std::stringstream& sqlQuery,
                         Json::Value& jsonValue,
                         bool& lastComponent,
                         std::stringstream& nameField);

  bool
  doPrefixBasedSearch(Json::Value& jsonValue,
                      std::vector<std::pair<std::string, std::string>>& typedComponents);

  bool
  doFilterBasedSearch(Json::Value& jsonValue,
                      std::vector<std::pair<std::string, std::string>>& typedComponents);

  ndn::Name
  getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
                      const ndn::Name::Component& version);

  std::string
  getChronoSyncDigest();

protected:
  typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
  // Handle to the Catalog's database
  std::shared_ptr<DatabaseHandler> m_dbConnPool;
  const std::shared_ptr<chronosync::Socket>& m_socket;

  // mutex to control critical sections
  std::mutex m_mutex;
  // @{ needs m_mutex protection
  // The Queries we are currently writing to
  ndn::util::InMemoryStorageLru m_activeQueryToFirstResponse;
  ndn::util::InMemoryStorageLru m_cache;
  std::string m_chronosyncDigest;
  // @}
  RegisteredPrefixList m_registeredPrefixList;
  ndn::Name m_catalogId; // should be replaced with the PK digest
  std::vector<std::string> m_filterCategoryNames;
};

template <typename DatabaseHandler>
QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
                                            const std::shared_ptr<ndn::KeyChain>& keyChain,
                                            const std::shared_ptr<chronosync::Socket>& syncSocket)
  : util::CatalogAdapter(face, keyChain)
  , m_socket(syncSocket)
  , m_activeQueryToFirstResponse(100000)
  , m_cache(250000)
  , m_chronosyncDigest("0")
  , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
{
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setFilters()
{
  m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
                            bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
                                 this, _1, _2),
                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
                                 this, _1),
                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
                                 this, _1, _2));
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
                                             const ndn::Name& prefix,
                                             const std::vector<std::string>& nameFields,
                                             const std::string& databaseTable)
{
  m_nameFields = nameFields;
  m_databaseTable = databaseTable;
  config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
                                                _1, _2, _3, prefix));
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
                                        bool isDryRun,
                                        const std::string& filename,
                                        const ndn::Name& prefix)
{
  using namespace util;
  if (isDryRun) {
    return;
  }
  std::string signingId, dbServer, dbName, dbUser, dbPasswd;
  for (auto item = section.begin();
       item != section.end();
       ++item)
  {
    if (item->first == "signingId") {
      signingId = item->second.get_value<std::string>();
      if (signingId.empty()) {
        throw Error("Empty value for \"signingId\""
                    " in \"query\" section");
      }
    }
    if (item->first == "filterCategoryNames") {
      std::istringstream ss(item->second.get_value<std::string>());
      std::string token;
      while(std::getline(ss, token, ',')) {
        m_filterCategoryNames.push_back(token);
      }
    }
    if (item->first == "database") {
      const util::ConfigSection& dataSection = item->second;
      for (auto subItem = dataSection.begin();
           subItem != dataSection.end();
           ++subItem)
      {
        if (subItem->first == "dbServer") {
          dbServer = subItem->second.get_value<std::string>();
        }
        if (subItem->first == "dbName") {
          dbName = subItem->second.get_value<std::string>();
        }
        if (subItem->first == "dbUser") {
          dbUser = subItem->second.get_value<std::string>();
        }
        if (subItem->first == "dbPasswd") {
          dbPasswd = subItem->second.get_value<std::string>();
        }
      }

      if (dbServer.empty()){
        throw Error("Invalid value for \"dbServer\""
                    " in \"query\" section");
      }
      if (dbName.empty()){
        throw Error("Invalid value for \"dbName\""
                    " in \"query\" section");
      }
      if (dbUser.empty()){
        throw Error("Invalid value for \"dbUser\""
                    " in \"query\" section");
      }
      if (dbPasswd.empty()){
        throw Error("Invalid value for \"dbPasswd\""
                    " in \"query\" section");
      }
    }
  }

  if (m_filterCategoryNames.empty()) {
    throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
  }

  m_prefix = prefix;

  m_signingId = ndn::Name(signingId);
  setCatalogId();

  util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
  setDatabaseHandler(mysqlId);
  setFilters();
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setCatalogId()
{
  //empty
}

template <>
void
QueryAdapter<ConnectionPool_T>::setCatalogId()
{
  // use public key digest as the catalog ID
  ndn::Name keyId;
  if (m_signingId.empty()) {
    keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
  } else {
    keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
  }

  std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
  ndn::Block keyDigest = pKey->computeDigest();
  m_catalogId.clear();
  m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
{
  //empty
}

template <>
void
QueryAdapter<ConnectionPool_T>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
{
  m_dbConnPool = zdbConnectionSetup(databaseId);
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::closeDatabaseHandler()
{
}

template <>
void
QueryAdapter<ConnectionPool_T>::closeDatabaseHandler()
{
  ConnectionPool_stop(*m_dbConnPool);
}


template <typename DatabaseHandler>
QueryAdapter<DatabaseHandler>::~QueryAdapter()
{
  for (const auto& itr : m_registeredPrefixList) {
    if (static_cast<bool>(itr.second))
      m_face->unsetInterestFilter(itr.second);
  }

  closeDatabaseHandler();
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
                                                       const ndn::Interest& interest)
{
  _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");

  // Interest must carry component "initialization" or "query"
  if (interest.getName().size() < filter.getPrefix().size()) {
    // must NACK incorrect interest
    sendNack(interest.getName());
    return;
  }

  _LOG_DEBUG("Interest : " << interest.getName());
  std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();

  if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
    std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
                            this,
                            interestPtr);
    queryThread.detach();
  }
  else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {

    auto data = m_cache.find(interest);
    if (data) {
      m_face->put(*data);
      return;
    }

    // catalog must strip sequence number in an Interest for further process
    if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
      // Interest carries sequence number, only grip the main part
      // e.g., /hep/query/<query-params>/<version>/#seq
      ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));

      auto data = m_cache.find(queryInterest);
      if (data) {
        // catalog has generated some data, but still working on it
        return;
      }
      interestPtr = queryInterest.shared_from_this();
    }

    std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
                            this,
                            interestPtr);
    queryThread.detach();
  }

  // ignore other Interests
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
{
  _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");

  if(m_socket != nullptr) {
    const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
    std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
    _LOG_DEBUG("Original digest :" << m_chronosyncDigest);
    _LOG_DEBUG("New digest : " << digestStr);
    // if the m_chronosyncDigest and the rootdigest are not equal
    if (digestStr != m_chronosyncDigest) {
      // (1) update chronosyncDigest
      // (2) clear all staled ACK data
      m_mutex.lock();
      m_chronosyncDigest = digestStr;
      m_activeQueryToFirstResponse.erase(ndn::Name("/"));
      m_mutex.unlock();
      _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
    }
  }

  auto data = m_activeQueryToFirstResponse.find(*interest);
  if (data) {
    m_face->put(*data);
  }
  else {
    populateFiltersMenu(interest);
  }

  _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
{
  _LOG_DEBUG(">> QueryAdapter::populateFiltersMenu");
  Json::Value filters;
  Json::FastWriter fastWriter;
  getFiltersMenu(filters);

  const std::string filterValue = fastWriter.write(filters);

  if (!filters.empty()) {
    // use /<prefix>/filters-initialization/<seg> as data name
    ndn::Name filterDataName(interest->getName().getPrefix(-1));

    const char* payload = filterValue.c_str();
    size_t payloadLength = filterValue.size();
    size_t startIndex = 0, seqNo = 0;

    if (filterValue.length() > PAYLOAD_LIMIT) {
      payloadLength = PAYLOAD_LIMIT;
      ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
      std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
      // freshnessPeriod 0 means permanent?
      filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
      filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);

      signData(*filterData);

      _LOG_DEBUG("Populate Filter Data :" << segmentName);

      m_mutex.lock();
      // save the filter results in the activeQueryToFirstResponse structure
      // when version changes, the activeQueryToFirstResponse should be cleaned
      m_activeQueryToFirstResponse.insert(*filterData);
      try {
        m_face->put(*filterData);
      }
      catch (std::exception& e) {
        _LOG_ERROR(e.what());
      }
      m_mutex.unlock();

      seqNo++;
      startIndex = payloadLength * seqNo + 1;
    }
    payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;

    ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
    std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
    filterData->setFreshnessPeriod(ndn::time::milliseconds(10));
    filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
    filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));

    signData(*filterData);
    m_mutex.lock();
    m_activeQueryToFirstResponse.insert(*filterData);
    m_face->put(*filterData);
    m_mutex.unlock();
  }
  _LOG_DEBUG("<< QueryAdapter::populateFiltersMenu");
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
{
  // empty
}

// get distinct value of each column
template <>
void
QueryAdapter<ConnectionPool_T>::getFiltersMenu(Json::Value& value)
{
  _LOG_DEBUG(">> QueryAdapter::getFiltersMenu");
  Json::Value tmp;

  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
  if (!conn) {
    _LOG_DEBUG("No available database connections");
    return;
  }

  for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
    std::string columnName = m_filterCategoryNames[i];
    std::string getFilterSql("SELECT DISTINCT " + columnName +
                             " FROM " + m_databaseTable + ";");

    ResultSet_T res4ColumnName;
    TRY {
      res4ColumnName = Connection_executeQuery(conn, reinterpret_cast<const char*>(getFilterSql.c_str()), getFilterSql.size());
    }
    CATCH(SQLException) {
      _LOG_ERROR(Connection_getLastError(conn));
    }
    END_TRY;

    while (ResultSet_next(res4ColumnName)) {
      tmp[columnName].append(ResultSet_getString(res4ColumnName, 1));
    }

    value.append(tmp);
    tmp.clear();
  }

  _LOG_DEBUG("<< QueryAdapter::getFiltersMenu");
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
{
  if (m_signingId.empty())
    m_keyChain->sign(data);
  else {
    ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
    ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
    m_keyChain->sign(data, certName);
  }
}

template <typename DatabaseHandler>
ndn::Name
QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
                                                   const ndn::Name::Component& version)
{
  // use generic name, instead of specific one
  ndn::Name queryResultName = interest->getName();
  queryResultName.append(version);
  return queryResultName;
}

template <typename DatabaseHandler>
std::shared_ptr<ndn::Data>
QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
                                           const ndn::Name::Component& version)
{
  std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());

  std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
  ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
                  queryResultNameStr.length());
  ack->setFreshnessPeriod(ndn::time::milliseconds(10000));

  signData(*ack);

  _LOG_DEBUG("Make ACK : " << queryResultNameStr);

  return ack;
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::sendNack(const ndn::Name& dataPrefix)
{
  uint64_t segmentNo = 0;

  std::shared_ptr<ndn::Data> nack =
    std::make_shared<ndn::Data>(ndn::Name(dataPrefix).appendSegment(segmentNo));
  nack->setFreshnessPeriod(ndn::time::milliseconds(10000));
  nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));

  signData(*nack);

  _LOG_DEBUG("Send Nack: " << ndn::Name(dataPrefix).appendSegment(segmentNo));

  m_mutex.lock();
  m_cache.insert(*nack);
  m_face->put(*nack);
  m_mutex.unlock();
}

template <typename DatabaseHandler>
bool
QueryAdapter<DatabaseHandler>::json2AutocompletionSql(std::stringstream& sqlQuery,
                                                      Json::Value& jsonValue,
                                                      bool& lastComponent,
                                                      std::stringstream& fieldName)
{
  _LOG_DEBUG(">> QueryAdapter::json2AutocompletionSql");

  _LOG_DEBUG(jsonValue.toStyledString());

  if (jsonValue.type() != Json::objectValue) {
    return false;
  }

  std::string typedString;
  // get the string in the jsonValue
  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
  {
    Json::Value key = iter.key();
    Json::Value value = (*iter);

    if (key == Json::nullValue || value == Json::nullValue) {
      _LOG_ERROR("Null key or value in JsonValue");
      return false;
    }

    // cannot convert to string
    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
      _LOG_ERROR("Malformed JsonQuery string");
      return false;
    }

    if (key.asString().compare("?") == 0) {
      typedString = value.asString();
      // since the front end triggers the autocompletion when users typed '/',
      // there must be a '/' at the end, and the first char must be '/'
      if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
        return false;
      break;
    }
  }

  // 1. get the expected column number by parsing the typedString, so we can get the filed name
  size_t pos = 0;
  size_t start = 1; // start from the 1st char which is not '/'
  size_t count = 0; // also the name to query for
  std::string token;
  std::string delimiter = "/";
  std::map<std::string, std::string> typedComponents;
  while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
    token = typedString.substr(start, pos - start);
    if (count >= m_nameFields.size() - 1) {
      return false;
    }

    // add column name and value (token) into map
    typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
    count++;
    start = pos + 1;
  }

  // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
  // return true
  if (count == m_nameFields.size() - 1)
    lastComponent = true; // indicate this query is to query the last component

  bool more = false;

  fieldName << m_nameFields[count];
  for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
       it != typedComponents.end(); ++it) {
    if (more)
      sqlQuery << " AND";
    else
      sqlQuery << " WHERE";

    sqlQuery << " " << it->first << "='" << it->second << "'";

    more = true;
  }
  sqlQuery << ";";
  return true;
}

template <typename databasehandler>
bool
QueryAdapter<databasehandler>::doPrefixBasedSearch(Json::Value& jsonValue,
                                                   std::vector<std::pair<std::string, std::string>>& typedComponents)
{
  _LOG_DEBUG(">> QueryAdapter::doPrefixBasedSearch");

  if (jsonValue.type() != Json::objectValue) {
    return false;
  }

  std::string typedString;
  // get the string in the jsonValue
  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
  {
    Json::Value key = iter.key();
    Json::Value value = (*iter);

    if (key == Json::nullValue || value == Json::nullValue) {
      _LOG_ERROR("null key or value in jsonValue");
      return false;
    }

    // cannot convert to string
    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
      _LOG_ERROR("malformed jsonquery string");
      return false;
    }

    if (key.asString().compare("??") == 0) {
      typedString = value.asString();
      if (typedString.empty() || typedString.find("/") != 0)
        return false;
      break;
    }
  }

  // 1. get the expected column number by parsing the typedString, so we can get the filed name
  size_t pos = 0;
  size_t start = 1; // start from the 1st char which is not '/'
  size_t count = 0; // also the name to query for
  size_t typedStringlen = typedString.length();
  std::string token;
  std::string delimiter = "/";

  while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
    token = typedString.substr(start, pos - start);
    if (count >= m_nameFields.size()) {
      return false;
    }

    // add column name and value (token) into map
    typedComponents.push_back(std::make_pair(m_nameFields[count], token));

    count++;
    start = pos + 1;
  }

  // we may have a component after the last "/"
  if (start < typedStringlen) {
    typedComponents.push_back(std::make_pair(m_nameFields[count],
                                             typedString.substr(start, typedStringlen - start)));
  }

  return true;
}


template <typename databasehandler>
bool
QueryAdapter<databasehandler>::doFilterBasedSearch(Json::Value& jsonValue,
                                                   std::vector<std::pair<std::string, std::string>>& typedComponents)
{
  _LOG_DEBUG(">> QueryAdapter::doFilterBasedSearch");

  if (jsonValue.type() != Json::objectValue) {
    return false;
  }

  for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
  {
    Json::Value key = iter.key();
    Json::Value value = (*iter);

    if (key == Json::nullValue || value == Json::nullValue) {
      _LOG_ERROR("null key or value in jsonValue");
      return false;
    }

    // cannot convert to string
    if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
      _LOG_ERROR("malformed jsonQuery string");
      return false;
    }

    if (key.asString().compare("?") == 0 || key.asString().compare("??") == 0) {
      continue;
    }

    _LOG_DEBUG(key.asString() << " " << value.asString());
    typedComponents.push_back(std::make_pair(key.asString(), value.asString()));
  }

  return true;
}


template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
{
  _LOG_DEBUG(">> QueryAdapter::runJsonQuery");

  // 1) Strip the prefix off the ndn::Interest's ndn::Name
  // +1 to grab JSON component after "query" component

  ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
  // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
  const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());

  if (jsonQuery.length() <= 0) {
    // no JSON query, send Nack
    sendNack(interest->getName());
    return;
  }

  // the version should be replaced with ChronoSync state digest
  ndn::name::Component version;

  if(m_socket != nullptr) {
    const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
    std::string digestStr = ndn::toHex(digestPtr->buf(), digestPtr->size());
    _LOG_DEBUG("Original digest " << m_chronosyncDigest);
    _LOG_DEBUG("New digest : " << digestStr);
    // if the m_chronosyncDigest and the rootdigest are not equal
    if (digestStr != m_chronosyncDigest) {
      // (1) update chronosyncDigest
      // (2) clear all staled ACK data
      m_mutex.lock();
      m_chronosyncDigest = digestStr;
      m_activeQueryToFirstResponse.erase(ndn::Name("/"));
      m_mutex.unlock();
      _LOG_DEBUG("Change digest to " << m_chronosyncDigest);
    }
    version = ndn::name::Component::fromEscapedString(digestStr);
  }
  else {
    version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
  }

  // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
  Json::Value parsedFromString;
  Json::Reader reader;
  if (!reader.parse(jsonQuery, parsedFromString)) {
    // json object is broken
    sendNack(interest->getName());
    _LOG_ERROR("Cannot parse the JsonQuery");
    return;
  }

  // 3) Convert the JSON Query into a MySQL one
  ndn::Name segmentPrefix(getQueryResultsName(interest, version));
  _LOG_DEBUG("segmentPrefix :" << segmentPrefix);

  Json::Value tmp;
  std::vector<std::pair<std::string, std::string>> typedComponents;

  // expect the autocomplete and the component-based query are separate
  // if Json::Value contains ? as key, is autocompletion
  if (parsedFromString.get("?", tmp) != tmp) {
    bool lastComponent = false;
    std::stringstream sqlQuery, fieldName;

    // must generate the sql string for autocomple, the selected column is changing
    if (!json2AutocompletionSql(sqlQuery, parsedFromString, lastComponent, fieldName)) {
      sendNack(segmentPrefix);
      return;
    }
    prepareSegmentsBySqlString(segmentPrefix, sqlQuery.str(), lastComponent, fieldName.str());
  }
  else if (parsedFromString.get("??", tmp) != tmp) {
    if (!doPrefixBasedSearch(parsedFromString, typedComponents)) {
      sendNack(segmentPrefix);
      return;
    }
    prepareSegmentsByParams(typedComponents, segmentPrefix);
  }
  else {
    if (!doFilterBasedSearch(parsedFromString, typedComponents)) {
      sendNack(segmentPrefix);
      return;
    }
    prepareSegmentsByParams(typedComponents, segmentPrefix);
  }

}

template <typename databasehandler>
void
QueryAdapter<databasehandler>::
prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
                        const ndn::Name& segmentprefix)
{
}

template <>
void
QueryAdapter<ConnectionPool_T>::
prepareSegmentsByParams(std::vector<std::pair<std::string, std::string>>& queryParams,
                        const ndn::Name& segmentPrefix)
{
  _LOG_DEBUG(">> QueryAdapter::prepareSegmentsByParams");

  // the prepared_statement cannot improve the performance, but can simplify the code
  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
  if (!conn) {
    // do not answer for this request due to lack of connections, request will come back later
    _LOG_DEBUG("No available database connections");
    return;
  }
  std::string getRecordNumSqlStr("SELECT count(name) FROM ");
  getRecordNumSqlStr += m_databaseTable;
  getRecordNumSqlStr += " WHERE ";
  for (size_t i = 0; i < m_nameFields.size(); i++) {
    getRecordNumSqlStr += m_nameFields[i];
    getRecordNumSqlStr += " LIKE ?";
    if (i != m_nameFields.size() - 1) {
      getRecordNumSqlStr += " AND ";
    }
  }

  PreparedStatement_T ps4RecordNum =
    Connection_prepareStatement(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());

  // before query, initialize all params for statement
  for (size_t i = 0; i < m_nameFields.size(); i++) {
    PreparedStatement_setString(ps4RecordNum, i + 1, "%");
  }

  // reset params based on the query
  for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
       it != queryParams.end(); ++it) {
    // dictionary is faster
    for (size_t i = 0; i < m_nameFields.size(); i++) {
      if (it->first == m_nameFields[i]) {
        PreparedStatement_setString(ps4RecordNum, i + 1, it->second.c_str());
      }
    }
  }

  ResultSet_T res4RecordNum;
  TRY {
    res4RecordNum = PreparedStatement_executeQuery(ps4RecordNum);
  }
  CATCH(SQLException) {
    _LOG_ERROR(Connection_getLastError(conn));
  }
  END_TRY;

  uint64_t resultCount = 0; // use count sql to get

  // result for record number
  while (ResultSet_next(res4RecordNum)) {
    resultCount = ResultSet_getInt(res4RecordNum, 1);
  }

  // get name list statement
  std::string getNameListSqlStr("SELECT name FROM ");
  getNameListSqlStr += m_databaseTable;
  getNameListSqlStr += " WHERE ";
  for (size_t i = 0; i < m_nameFields.size(); i++) {
    getNameListSqlStr += m_nameFields[i];
    getNameListSqlStr += " LIKE ?";
    if (i != m_nameFields.size() - 1) {
      getNameListSqlStr += " AND ";
    }
  }

  PreparedStatement_T ps4Name =
    Connection_prepareStatement(conn, reinterpret_cast<const char*>(getNameListSqlStr.c_str()), getNameListSqlStr.size());

  // before query, initialize all params for statement
  for (size_t i = 0; i < m_nameFields.size(); i++) {
    PreparedStatement_setString(ps4Name, i + 1, "%");
  }

  // reset params based on the query
  for (std::vector<std::pair<std::string, std::string>>::iterator it = queryParams.begin();
       it != queryParams.end(); ++it) {
    // dictionary is faster
    for (size_t i = 0; i < m_nameFields.size(); i++) {
      if (it->first == m_nameFields[i]) {
        PreparedStatement_setString(ps4Name, i + 1, it->second.c_str());
      }
    }
  }

  ResultSet_T res4Name;
  TRY {
    res4Name = PreparedStatement_executeQuery(ps4Name);
  }
  CATCH(SQLException) {
    _LOG_ERROR(Connection_getLastError(conn));
  }
  END_TRY;

  generateSegments(res4Name, segmentPrefix, resultCount, false, false);

  Connection_close(conn);
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::generateSegments(ResultSet_T& res,
                                                const ndn::Name& segmentPrefix,
                                                int resultCount,
                                                bool autocomplete,
                                                bool lastComponent)
{
  uint64_t segmentno = 0;
  Json::Value tmp;
  Json::Value resultjson;
  Json::FastWriter fastWriter;

  uint64_t viewstart = 0, viewend = 0;
  while (ResultSet_next(res)) {
    const char *name = ResultSet_getString(res, 1);
    tmp.append(name);
    const std::string tmpString = fastWriter.write(tmp);
    if (tmpString.length() > PAYLOAD_LIMIT) {
      std::shared_ptr<ndn::Data> data
        = makeReplyData(segmentPrefix, resultjson, segmentno, false,
                        autocomplete, resultCount, viewstart, viewend, lastComponent);
      m_mutex.lock();
      m_cache.insert(*data);
      m_face->put(*data);
      m_mutex.unlock();
      tmp.clear();
      resultjson.clear();
      segmentno++;
      viewstart = viewend + 1;
    }
    resultjson.append(name);
    viewend++;
  }
  std::shared_ptr<ndn::Data> data
    = makeReplyData(segmentPrefix, resultjson, segmentno, true,
                    autocomplete, resultCount, viewstart, viewend, lastComponent);
  m_mutex.lock();
  m_cache.insert(*data);
  m_face->put(*data);
  m_mutex.unlock();
}

template <typename DatabaseHandler>
void
QueryAdapter<DatabaseHandler>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
                                                          const std::string& sqlString,
                                                          bool lastComponent,
                                                          const std::string& nameField)
{
  // empty
}


template <>
void
QueryAdapter<ConnectionPool_T>::prepareSegmentsBySqlString(const ndn::Name& segmentPrefix,
                                                          const std::string& sqlString,
                                                          bool lastComponent,
                                                          const std::string& nameField)
{
  _LOG_DEBUG(">> QueryAdapter::prepareSegmentsBySqlString");

  _LOG_DEBUG(sqlString);

  Connection_T conn = ConnectionPool_getConnection(*m_dbConnPool);
  if (!conn) {
    _LOG_DEBUG("No available database connections");
    return;
  }

  //// just for get the rwo count ...
  std::string getRecordNumSqlStr("SELECT COUNT( DISTINCT ");
  getRecordNumSqlStr += nameField;
  getRecordNumSqlStr += ") FROM ";
  getRecordNumSqlStr += m_databaseTable;
  getRecordNumSqlStr += sqlString;

  ResultSet_T res4RecordNum;
  TRY {
    res4RecordNum = Connection_executeQuery(conn, reinterpret_cast<const char*>(getRecordNumSqlStr.c_str()), getRecordNumSqlStr.size());
  }
  CATCH(SQLException) {
    _LOG_ERROR(Connection_getLastError(conn));
  }
  END_TRY;

  uint64_t resultCount = 0;
  while (ResultSet_next(res4RecordNum)) {
    resultCount = ResultSet_getInt(res4RecordNum, 1);
  }
  ////

  std::string getNextFieldsSqlStr("SELECT DISTINCT ");
  getNextFieldsSqlStr += nameField;
  getNextFieldsSqlStr += " FROM ";
  getNextFieldsSqlStr += m_databaseTable;
  getNextFieldsSqlStr += sqlString;

  ResultSet_T res4NextFields;
  TRY {
    res4NextFields = Connection_executeQuery(conn, reinterpret_cast<const char*>(getNextFieldsSqlStr.c_str()), getNextFieldsSqlStr.size());
  }
  CATCH(SQLException) {
    _LOG_ERROR(Connection_getLastError(conn));
  }
  END_TRY;

  generateSegments(res4NextFields, segmentPrefix, resultCount, true, lastComponent);

  Connection_close(conn);
}

template <typename DatabaseHandler>
std::shared_ptr<ndn::Data>
QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
                                             const Json::Value& value,
                                             uint64_t segmentNo,
                                             bool isFinalBlock,
                                             bool isAutocomplete,
                                             uint64_t resultCount,
                                             uint64_t viewStart,
                                             uint64_t viewEnd,
                                             bool lastComponent)
{
  Json::Value entry;
  Json::FastWriter fastWriter;

  entry["resultCount"] = Json::UInt64(resultCount);;
  entry["viewStart"] = Json::UInt64(viewStart);
  entry["viewEnd"] = Json::UInt64(viewEnd);

  if (lastComponent)
    entry["lastComponent"] = Json::Value(true);

  _LOG_DEBUG("resultCount " << resultCount << "; "
             << "viewStart " << viewStart << "; "
             << "viewEnd " << viewEnd);

  if (isAutocomplete) {
    entry["next"] = value;
  } else {
    entry["results"] = value;
  }
  const std::string jsonMessage = fastWriter.write(entry);
  const char* payload = jsonMessage.c_str();
  size_t payloadLength = jsonMessage.size() + 1;
  ndn::Name segmentName(segmentPrefix);
  segmentName.appendSegment(segmentNo);

  std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
  data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
  data->setFreshnessPeriod(ndn::time::milliseconds(10000));

  if (isFinalBlock) {
    data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
  }

  _LOG_DEBUG(segmentName);

  signData(*data);
  return data;
}


} // namespace query
} // namespace atmos
#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP
