populate filters menu

* add view index in payload
* fix bug that catalog crashes upon receiving request for empty prefix
* move component <catalogID> before <query-param>

refs: #3120

Change-Id: I468a72b6b2d1889b9c73052a0226dfeaa851179f
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ccfefb6..c6391ef 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -38,6 +38,7 @@
 #include <ndn-cxx/util/time.hpp>
 #include <ndn-cxx/encoding/encoding-buffer.hpp>
 #include <ndn-cxx/util/in-memory-storage-lru.hpp>
+#include <ndn-cxx/util/string-helper.hpp>
 
 #include "mysql/mysql.h"
 
@@ -78,7 +79,8 @@
   void
   setConfigFile(util::ConfigFile& config,
                 const ndn::Name& prefix,
-                const std::vector<std::string>& nameFields);
+                const std::vector<std::string>& nameFields,
+                const std::string& databaseTable);
 
 protected:
   /**
@@ -110,6 +112,26 @@
   onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
 
   /**
+   * Handles requests for responses to an filter initialization request
+   *
+   * @param filter:   InterestFilter that caused this Interest to be routed
+   * @param interest: Interest that needs to be handled
+   */
+  virtual void
+  onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+  /**
+   * Helper function that generates query results from a Json query carried in the Interest
+   *
+   * @param interest:  Interest that needs to be handled
+   */
+  void
+  populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest);
+
+  void
+  getFiltersMenu(Json::Value& value);
+
+  /**
    * Helper function that makes query-results data
    *
    * @param segmentPrefix:  Name that identifies the Prefix for the Data
@@ -119,6 +141,8 @@
    *                         last entry
    * @param isAutocomplete: bool to indicate whether this is an autocomplete message
    * @param resultCount:    the number of records in the query results
+   * @param viewStart:      the start index of the record in the query results payload
+   * @param viewEnd:        the end index of the record in the query results payload
    */
   std::shared_ptr<ndn::Data>
   makeReplyData(const ndn::Name& segmentPrefix,
@@ -126,7 +150,9 @@
                 uint64_t segmentNo,
                 bool isFinalBlock,
                 bool isAutocomplete,
-                uint64_t resultCount);
+                uint64_t resultCount,
+                uint64_t viewStart,
+                uint64_t viewEnd);
 
   /**
    * Helper function that generates query results from a Json query carried in the Interest
@@ -189,6 +215,9 @@
   void
   setFilters();
 
+  void
+  setCatalogId();
+
   /**
    * Helper function that generates the sqlQuery string for autocomplete query
    * @param sqlQuery:     stringstream to save the sqlQuery string
@@ -200,7 +229,11 @@
 
   bool
   json2CompleteSearchSql(std::stringstream& sqlQuery,
-                 Json::Value& jsonValue);
+                         Json::Value& jsonValue);
+
+  ndn::Name
+  getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+                      const ndn::Name::Component& version);
 
 protected:
   typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
@@ -218,6 +251,7 @@
   RegisteredPrefixList m_registeredPrefixList;
   //std::vector<std::string> m_atmosColumns;
   ndn::Name m_catalogId; // should be replaced with the PK digest
+  std::vector<std::string> m_filterCategoryNames;
 };
 
 template <typename DatabaseHandler>
@@ -225,7 +259,7 @@
                                             const std::shared_ptr<ndn::KeyChain>& keyChain)
   : util::CatalogAdapter(face, keyChain)
   , m_cache(250000)
-  , m_catalogId("catalogIdPlaceHolder")
+  , m_catalogId("catalogIdPlaceHolder") // initialize for unitests
 {
 }
 
@@ -242,23 +276,37 @@
                             bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
                                  this, _1, _2));
 
-  ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
-  m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
+  ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
+  m_registeredPrefixList[queryResultsPrefix] =
+    m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
+                                                  .append("query-results").append(m_catalogId)),
                             bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
                                  this, _1, _2),
                             bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
                                  this, _1),
                             bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
                                  this, _1, _2));
+
+  ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
+  m_registeredPrefixList[filtersInitializationPrefix] =
+    m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+                                 this, _1, _2),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+                                 this, _1),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+                                 this, _1, _2));
 }
 
 template <typename DatabaseHandler>
 void
 QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
                                              const ndn::Name& prefix,
-                                             const std::vector<std::string>& nameFields)
+                                             const std::vector<std::string>& nameFields,
+                                             const std::string& databaseTable)
 {
   m_nameFields = nameFields;
+  m_databaseTable = databaseTable;
   config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
                                                 _1, _2, _3, prefix));
 }
@@ -277,44 +325,51 @@
   std::string signingId, dbServer, dbName, dbUser, dbPasswd;
   for (auto item = section.begin();
        item != section.end();
-       ++ item)
+       ++item)
   {
     if (item->first == "signingId") {
-      signingId.assign(item->second.get_value<std::string>());
+      signingId = item->second.get_value<std::string>();
       if (signingId.empty()) {
         throw Error("Empty value for \"signingId\""
                                 " in \"query\" section");
       }
     }
+    if (item->first == "filterCategoryNames") {
+      std::istringstream ss(item->second.get_value<std::string>());
+      std::string token;
+      while(std::getline(ss, token, ',')) {
+        m_filterCategoryNames.push_back(token);
+      }
+    }
     if (item->first == "database") {
       const util::ConfigSection& dataSection = item->second;
       for (auto subItem = dataSection.begin();
            subItem != dataSection.end();
-           ++ subItem)
+           ++subItem)
       {
         if (subItem->first == "dbServer") {
-          dbServer.assign(subItem->second.get_value<std::string>());
+          dbServer = subItem->second.get_value<std::string>();
           if (dbServer.empty()){
             throw Error("Invalid value for \"dbServer\""
                                     " in \"query\" section");
           }
         }
         if (subItem->first == "dbName") {
-          dbName.assign(subItem->second.get_value<std::string>());
+          dbName = subItem->second.get_value<std::string>();
           if (dbName.empty()){
             throw Error("Invalid value for \"dbName\""
                                     " in \"query\" section");
           }
         }
         if (subItem->first == "dbUser") {
-          dbUser.assign(subItem->second.get_value<std::string>());
+          dbUser = subItem->second.get_value<std::string>();
           if (dbUser.empty()){
             throw Error("Invalid value for \"dbUser\""
                                     " in \"query\" section");
           }
         }
         if (subItem->first == "dbPasswd") {
-          dbPasswd.assign(subItem->second.get_value<std::string>());
+          dbPasswd = subItem->second.get_value<std::string>();
           if (dbPasswd.empty()){
             throw Error("Invalid value for \"dbPasswd\""
                                     " in \"query\" section");
@@ -324,16 +379,47 @@
     }
   }
 
-  m_prefix = prefix;
-  m_signingId = ndn::Name(signingId);
-  util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+  if (m_filterCategoryNames.size() == 0) {
+    throw Error("Empty value for \"filterCategoryNames\" in \"query\" section");
+  }
 
+  m_prefix = prefix;
+
+  m_signingId = ndn::Name(signingId);
+  setCatalogId();
+
+  util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
   setDatabaseHandler(mysqlId);
   setFilters();
 }
 
 template <typename DatabaseHandler>
 void
+QueryAdapter<DatabaseHandler>::setCatalogId()
+{
+  //empty
+}
+
+template <>
+void
+QueryAdapter<MYSQL>::setCatalogId()
+{
+  // use public key digest as the catalog ID
+  ndn::Name keyId;
+  if (m_signingId.empty()) {
+    keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
+  } else {
+    keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+  }
+
+  std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
+  ndn::Block keyDigest = pKey->computeDigest();
+  m_catalogId.clear();
+  m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
+}
+
+template <typename DatabaseHandler>
+void
 QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
 {
   //empty
@@ -370,7 +456,9 @@
   }
   std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
 
+#ifndef NDEBUG
   std::cout << "incoming query interest : " << interestPtr->getName() << std::endl;
+#endif
 
   // @todo: use thread pool
   std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
@@ -389,7 +477,9 @@
   // now. In the future, this should check some form of
   // InMemoryStorage.
 
+#ifndef NDEBUG
   std::cout << "incoming query-results interest : " << interest.toUri() << std::endl;
+#endif
 
   auto data = m_cache.find(interest.getName());
   if (data) {
@@ -399,6 +489,132 @@
 
 template <typename DatabaseHandler>
 void
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
+                                                           const ndn::Interest& interest)
+{
+  std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+
+#ifndef NDEBUG
+  std::cout << "incoming initialization interest : " << interestPtr->getName() << std::endl;
+#endif
+  // TODO: save the content in memory, first check the memory, if not exists, start thread to generate it
+  // Note that if ChronoSync state changes, we need to clear the saved value, and regenerate it
+
+  auto data = m_cache.find(interest.getName());
+  if (data) {
+    m_face->put(*data);
+  }
+  else {
+    std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
+                            this,
+                            interestPtr);
+    queryThread.join();
+  }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::populateFiltersMenu(std::shared_ptr<const ndn::Interest> interest)
+{
+  Json::Value filters;
+  Json::FastWriter fastWriter;
+  getFiltersMenu(filters);
+
+  const std::string filterValue = fastWriter.write(filters);
+
+  if (!filters.empty()) {
+    ndn::Name filterDataName(interest->getName());
+    filterDataName.append("stateVersion");// TODO: should replace with a state version
+
+    const char* payload = filterValue.c_str();
+    size_t payloadLength = filterValue.size();
+    size_t startIndex = 0, seqNo = 0;
+
+    if (filterValue.length() > PAYLOAD_LIMIT) {
+      payloadLength = PAYLOAD_LIMIT;
+      ndn::Name segmentName = ndn::Name(filterDataName).appendSegment(seqNo);
+      std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(segmentName);
+      filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+      filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+
+      signData(*filterData);
+#ifndef NDEBUG
+      std::cout << "populate filter Data : " << segmentName << std::endl;
+#endif
+      m_mutex.lock();
+      m_cache.insert(*filterData);
+      try {
+        m_face->put(*filterData);
+      }// catch exceptions and log
+      catch (std::exception& e) {
+        std::cout << e.what() << std::endl;
+      }
+      m_mutex.unlock();
+
+      seqNo++;
+      startIndex = payloadLength * seqNo + 1;
+    }
+    payloadLength = filterValue.size() - PAYLOAD_LIMIT * seqNo;
+
+    ndn::Name lastSegment = ndn::Name(filterDataName).appendSegment(seqNo);
+    std::shared_ptr<ndn::Data> filterData = std::make_shared<ndn::Data>(lastSegment);
+    filterData->setFreshnessPeriod(ndn::time::milliseconds(10000));
+    filterData->setContent(reinterpret_cast<const uint8_t*>(payload + startIndex), payloadLength);
+    filterData->setFinalBlockId(ndn::Name::Component::fromSegment(seqNo));
+
+    signData(*filterData);
+    m_mutex.lock();
+    m_cache.insert(*filterData);
+    m_face->put(*filterData);
+    m_mutex.unlock();
+  }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::getFiltersMenu(Json::Value& value)
+{
+  // empty
+}
+
+// get distinct value of each column
+template <>
+void
+QueryAdapter<MYSQL>::getFiltersMenu(Json::Value& value)
+{
+  Json::Value tmp;
+
+  for (size_t i = 0; i < m_filterCategoryNames.size(); i++) {
+    std::string columnName = m_filterCategoryNames[i];
+    std::string getFilterSql("SELECT DISTINCT " + columnName +
+                             " FROM " + m_databaseTable + ";");
+    std::string errMsg;
+    bool success;
+
+    std::shared_ptr<MYSQL_RES> results
+      = atmos::util::MySQLPerformQuery(m_databaseHandler, getFilterSql,
+                                       util::QUERY, success, errMsg);
+    if (!success) {
+      std::cout << errMsg << std::endl;
+      value.clear();
+      return;
+    }
+
+    while (MYSQL_ROW row = mysql_fetch_row(results.get()))
+    {
+      tmp[columnName].append(row[0]);
+    }
+    value.append(tmp);
+    tmp.clear();
+  }
+
+#ifndef NDEBUG
+  std::cout << value.toStyledString() << std::endl;
+#endif
+}
+
+template <typename DatabaseHandler>
+void
 QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
 {
   if (m_signingId.empty())
@@ -411,22 +627,36 @@
 }
 
 template <typename DatabaseHandler>
+ndn::Name
+QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
+                                                   const ndn::Name::Component& version)
+{
+  // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
+  // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
+
+  ndn::Name queryResultName(m_prefix);
+  queryResultName.append("query-results")
+    .append(m_catalogId)
+    .append(interest->getName().get(-1))
+    .append(version);
+  return queryResultName;
+}
+
+template <typename DatabaseHandler>
 std::shared_ptr<ndn::Data>
 QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
                                            const ndn::Name::Component& version)
 {
-  // JSON parsed ok, so we can acknowledge successful receipt of the query
-  ndn::Name ackName(interest->getName());
-  ackName.append(version);
-  ackName.append(m_catalogId);
-  ackName.append("OK");
+  std::string queryResultNameStr(getQueryResultsName(interest, version).toUri());
 
-  std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+  std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(interest->getName());
+  ack->setContent(reinterpret_cast<const uint8_t*>(queryResultNameStr.c_str()),
+                  queryResultNameStr.length());
   ack->setFreshnessPeriod(ndn::time::milliseconds(10000));
 
   signData(*ack);
 #ifndef NDEBUG
-  std::cout << "makeAckData : " << ackName << std::endl;
+  std::cout << "qurey-results data name in ACK : " << queryResultNameStr << std::endl;
 #endif
   return ack;
 }
@@ -465,7 +695,7 @@
     return false;
   }
 
-  sqlQuery << "SELECT name FROM cmip5";
+  sqlQuery << "SELECT name FROM " << m_databaseTable;
   bool input = false;
   for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
   {
@@ -536,7 +766,7 @@
     }
 
     if (key.asString().compare("?") == 0) {
-      typedString.assign(value.asString());
+      typedString = value.asString();
       // since the front end triggers the autocompletion when users typed '/',
       // there must be a '/' at the end, and the first char must be '/'
       if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
@@ -560,14 +790,14 @@
 
     // add column name and value (token) into map
     typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
-    count ++;
+    count++;
     start = pos + 1;
   }
 
   // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
   // return true
   bool more = false;
-  sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
+  sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM " << m_databaseTable;
   for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
        it != typedComponents.end(); ++it) {
     if (more)
@@ -615,10 +845,11 @@
     }
 
     if (key.asString().compare("??") == 0) {
-      typedString.assign(value.asString());
+      typedString = value.asString();
       // since the front end triggers the autocompletion when users typed '/',
       // there must be a '/' at the end, and the first char must be '/'
-      if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+      if (typedString.empty() || typedString.at(typedString.length() - 1) != '/' ||
+          typedString.find("/") != 0)
         return false;
       break;
     }
@@ -639,14 +870,14 @@
 
     // add column name and value (token) into map
     typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
-    count ++;
+    count++;
     start = pos + 1;
   }
 
   // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
   // return true
   bool more = false;
-  sqlQuery << "SELECT name FROM cmip5";
+  sqlQuery << "SELECT name FROM " << m_databaseTable;
   for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
        it != typedComponents.end(); ++it) {
     if (more)
@@ -723,8 +954,8 @@
     // This is where things are expensive so we save them for the lock
     // note that we ack the query with the cached ACK messages, but we should remove the ACKs
     // that conatin the old version when ChronoSync is updated
-    m_activeQueryToFirstResponse.insert(std::pair<std::string,
-                                        std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+    //m_activeQueryToFirstResponse.insert(std::pair<std::string,
+    //                                    std::shared_ptr<ndn::Data>>(jsonQuery, ack));
     m_face->put(*ack);
   } // !!!  END  CRITICAL SECTION !!!
   m_mutex.unlock();
@@ -733,13 +964,7 @@
   bool autocomplete = false;
   std::stringstream sqlQuery;
 
-  // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
-  // for now, should be /<prefix>/query-results/<query-parameters>/<version>/, latter add catalog-id
-  ndn::Name segmentPrefix(m_prefix);
-  segmentPrefix.append("query-results");
-  segmentPrefix.append(jsonStr);
-  segmentPrefix.append(version);
-  segmentPrefix.append(m_catalogId);
+  ndn::Name segmentPrefix(getQueryResultsName(interest, version));
 
   Json::Value tmp;
   // expect the autocomplete and the component-based query are separate
@@ -804,36 +1029,44 @@
 
   uint64_t resultCount = mysql_num_rows(results.get());
 
+#ifndef NDEBUG
   std::cout << "Query results for \""
             << sqlString
             << "\" contain "
             << resultCount
             << " rows" << std::endl;
+#endif
 
   MYSQL_ROW row;
   uint64_t segmentNo = 0;
   Json::Value tmp;
   Json::Value resultJson;
   Json::FastWriter fastWriter;
+
+  uint64_t viewStart = 0, viewEnd = 0;
   while ((row = mysql_fetch_row(results.get())))
   {
     tmp.append(row[0]);
     const std::string tmpString = fastWriter.write(tmp);
     if (tmpString.length() > PAYLOAD_LIMIT) {
       std::shared_ptr<ndn::Data> data
-        = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
+        = makeReplyData(segmentPrefix, resultJson, segmentNo, false,
+                        autocomplete, resultCount, viewStart, viewEnd);
       m_mutex.lock();
       m_cache.insert(*data);
       m_mutex.unlock();
       tmp.clear();
       resultJson.clear();
-      segmentNo ++;
+      segmentNo++;
+      viewStart = viewEnd + 1;
     }
     resultJson.append(row[0]);
+    viewEnd++;
   }
 
   std::shared_ptr<ndn::Data> data
-    = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
+    = makeReplyData(segmentPrefix, resultJson, segmentNo, true,
+                    autocomplete, resultCount, viewStart, viewEnd);
   m_mutex.lock();
   m_cache.insert(*data);
   m_mutex.unlock();
@@ -846,12 +1079,23 @@
                                              uint64_t segmentNo,
                                              bool isFinalBlock,
                                              bool isAutocomplete,
-                                             uint64_t resultCount)
+                                             uint64_t resultCount,
+                                             uint64_t viewStart,
+                                             uint64_t viewEnd)
 {
   Json::Value entry;
   Json::FastWriter fastWriter;
-  Json::UInt64 count(resultCount);
-  entry["resultCount"] = count;
+
+  entry["resultCount"] = Json::UInt64(resultCount);;
+  entry["viewStart"] = Json::UInt64(viewStart);
+  entry["viewEnd"] = Json::UInt64(viewEnd);
+
+#ifndef NDEBUG
+  std::cout << "resultCount " << resultCount
+            << "; viewStart " << viewStart
+            << "; viewEnd "   << viewEnd << std::endl;
+#endif
+
   if (isAutocomplete) {
     entry["next"] = value;
   } else {