catalog: Transparent query routing implementation

Change-Id: Id174f42f21da4230d51df178803dc38ada3ce573
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ab0b126..ee60c34 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -111,25 +111,15 @@
    * @param interest: Interest that needs to be handled
    */
   virtual void
-  onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
-
-  /**
-   * Handles requests for responses to an existing query
-   *
-   * @param filter:   InterestFilter that caused this Interest to be routed
-   * @param interest: Interest that needs to be handled
-   */
-  virtual void
-  onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+  onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
 
   /**
    * Handles requests for responses to an filter initialization request
    *
-   * @param filter:   InterestFilter that caused this Interest to be routed
    * @param interest: Interest that needs to be handled
    */
   virtual void
-  onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+  onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);
 
   /**
    * Helper function that generates query results from a Json query carried in the Interest
@@ -292,30 +282,8 @@
 void
 QueryAdapter<DatabaseHandler>::setFilters()
 {
-  ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
-  m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
-                                 this, _1, _2),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
-                                 this, _1),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
-                                 this, _1, _2));
-
-  ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
-  m_registeredPrefixList[queryResultsPrefix] =
-    m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
-                                                  .append("query-results").append(m_catalogId)),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
-                                 this, _1, _2),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
-                                 this, _1),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
-                                 this, _1, _2));
-
-  ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
-  m_registeredPrefixList[filtersInitializationPrefix] =
-    m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
-                            bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+  m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
+                            bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
                                  this, _1, _2),
                             bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
                                  this, _1),
@@ -471,56 +439,62 @@
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
-                                               const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
+                                                       const ndn::Interest& interest)
 {
-  _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
+  _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");
 
-  if (interest.getName().size() != filter.getPrefix().size() + 1) {
+  // Interest must carry component "initialization" or "query"
+  if (interest.getName().size() < filter.getPrefix().size()) {
     // @todo: return a nack
     return;
   }
 
+  _LOG_DEBUG("Interest : " << interest.getName());
   std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
 
-  _LOG_DEBUG("Interest : " << interestPtr->getName());
+  if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
+    std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+                            this,
+                            interestPtr);
+    queryThread.join();
+  }
+  else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
 
-  // @todo: use thread pool
-  std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
-                          this,
-                          interestPtr);
-  queryThread.join();
-}
+    auto data = m_cache.find(interest);
+    if (data) {
+      m_face->put(*data);
+      return;
+    }
 
-template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
-                                                      const ndn::Interest& interest)
-{
-  // FIXME Results are currently getting served out of the forwarder's
-  // CS so we just ignore any retrieval Interests that hit us for
-  // now. In the future, this should check some form of
-  // InMemoryStorage.
+    // catalog must strip sequence number in an Interest for further process
+    if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
+      // Interest carries sequence number, only grip the main part
+      // e.g., /hep/query/<query-params>/<version>/#seq
+      ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));
 
-  _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
+      auto data = m_cache.find(queryInterest);
+      if (data) {
+        // catalog has generated some data, but still working on it
+        return;
+      }
+      interestPtr = queryInterest.shared_from_this();
+    }
 
-  auto data = m_cache.find(interest.getName());
-  if (data) {
-    m_face->put(*data);
+    std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
+                            this,
+                            interestPtr);
+    queryThread.join();
   }
 
-  _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
+  // ignore other Interests
 }
 
 template <typename DatabaseHandler>
 void
-QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
-                                                               const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
 {
   _LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
-  std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-
-  _LOG_DEBUG("Interest : " << interestPtr->getName());
 
   if(m_socket != nullptr) {
     const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
@@ -539,15 +513,12 @@
     }
   }
 
-  auto data = m_activeQueryToFirstResponse.find(interest.getName());
+  auto data = m_activeQueryToFirstResponse.find(*interest);
   if (data) {
     m_face->put(*data);
   }
   else {
-    std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
-                            this,
-                            interestPtr);
-    queryThread.join();
+    populateFiltersMenu(interest);
   }
 
   _LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
@@ -676,14 +647,9 @@
 QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
                                                    const ndn::Name::Component& version)
 {
-  // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
-  // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
-
-  ndn::Name queryResultName(m_prefix);
-  queryResultName.append("query-results")
-    .append(m_catalogId)
-    .append(interest->getName().get(-1))
-    .append(version);
+  // use generic name, instead of specific one
+  ndn::Name queryResultName = interest->getName();
+  queryResultName.append(version);
   return queryResultName;
 }
 
@@ -989,19 +955,6 @@
     version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
   }
 
-  // try to respond with the inMemoryStorage
-  m_mutex.lock();
-  { // !!! BEGIN CRITICAL SECTION !!!
-    auto data = m_activeQueryToFirstResponse.find(interest->getName());
-    if (data) {
-      _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
-      m_face->put(*data);
-      m_mutex.unlock();
-      return;
-    }
-  } // !!! END CRITICAL SECTION !!!
-  m_mutex.unlock();
-
   // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
   Json::Value parsedFromString;
   Json::Reader reader;
@@ -1011,32 +964,12 @@
     return;
   }
 
-  std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
-
-  m_mutex.lock();
-  { // !!! BEGIN CRITICAL SECTION !!!
-    // An unusual race-condition case, which requires things like PIT aggregation to be off.
-    auto data = m_activeQueryToFirstResponse.find(interest->getName());
-    if (data) {
-      m_face->put(*data);
-      m_mutex.unlock();
-      return;
-    }
-
-    // This is where things are expensive so we save them for the lock
-    // note that we ack the query with the cached ACK messages, but we should remove the ACKs
-    // that conatin the old version when ChronoSync is updated
-    m_activeQueryToFirstResponse.insert(*ack);
-
-    m_face->put(*ack);
-  } // !!!  END  CRITICAL SECTION !!!
-  m_mutex.unlock();
-
   // 3) Convert the JSON Query into a MySQL one
   bool autocomplete = false, lastComponent = false;
   std::stringstream sqlQuery;
 
   ndn::Name segmentPrefix(getQueryResultsName(interest, version));
+  _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
 
   Json::Value tmp;
   // expect the autocomplete and the component-based query are separate
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index c9252d3..88c8ea2 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -384,6 +384,7 @@
     BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
   }
 
+  // use real data instead of ack data
   BOOST_AUTO_TEST_CASE(QueryAdapterMakeAckDataTest)
   {
     ndn::Interest interest(ndn::Name("/test/ack/data/json"));
@@ -396,9 +397,6 @@
 
     std::shared_ptr<ndn::Data> data = queryAdapterTest2.getAckData(interestPtr, version);
     BOOST_CHECK_EQUAL(data->getName().toUri(), "/test/ack/data/json");
-    BOOST_CHECK_EQUAL(std::string(reinterpret_cast<const char*>(data->getContent().value()),
-                                  data->getContent().value_size()),
-                      "/query-results/catalogIdPlaceHolder/json/%FD%01");
   }
 
   BOOST_AUTO_TEST_CASE(QueryAdapterMakeReplyDataTest1)
@@ -457,6 +455,7 @@
       = std::make_shared<ndn::Interest>(ndn::Name("/test/query").append(jsonMessage.c_str()));
 
     queryAdapterTest2.queryTest(queryInterest);
+
     // TODO: the code below should be enabled when queryAdapter can get the correct the
     // ChronoSync state; currently, we don't need the activeQuery to save the ACK data;
     //auto ackData = queryAdapterTest2.getDataFromActiveQuery(jsonMessage);
@@ -470,12 +469,11 @@
     //  BOOST_CHECK_EQUAL(ackData->getContent().value_size(), 0);
     //}
 
-    std::shared_ptr<ndn::Interest> resultInterest
-      = std::make_shared<ndn::Interest>(ndn::Name("/test/query-results"));
-    auto replyData = queryAdapterTest2.getDataFromCache(*resultInterest);
+    // there is no query-results namespace data
+    auto replyData = queryAdapterTest2.getDataFromCache(*queryInterest);
     BOOST_CHECK(replyData);
     if (replyData){
-      BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query-results"));
+      BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query"));
       const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()),
                                 replyData->getContent().value_size());
       Json::Value parsedFromString;