catalog: Transparent query routing implementation
Change-Id: Id174f42f21da4230d51df178803dc38ada3ce573
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ab0b126..ee60c34 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -111,25 +111,15 @@
* @param interest: Interest that needs to be handled
*/
virtual void
- onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
-
- /**
- * Handles requests for responses to an existing query
- *
- * @param filter: InterestFilter that caused this Interest to be routed
- * @param interest: Interest that needs to be handled
- */
- virtual void
- onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
* Handles requests for responses to an filter initialization request
*
- * @param filter: InterestFilter that caused this Interest to be routed
* @param interest: Interest that needs to be handled
*/
virtual void
- onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);
/**
* Helper function that generates query results from a Json query carried in the Interest
@@ -292,30 +282,8 @@
void
QueryAdapter<DatabaseHandler>::setFilters()
{
- ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
- m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
- bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
- this, _1, _2),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-
- ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
- m_registeredPrefixList[queryResultsPrefix] =
- m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
- .append("query-results").append(m_catalogId)),
- bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
- this, _1, _2),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-
- ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
- m_registeredPrefixList[filtersInitializationPrefix] =
- m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
- bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
+ bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
this, _1, _2),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
this, _1),
@@ -471,56 +439,62 @@
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
{
- _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
+ _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");
- if (interest.getName().size() != filter.getPrefix().size() + 1) {
+ // Interest must carry component "initialization" or "query"
+ if (interest.getName().size() < filter.getPrefix().size()) {
// @todo: return a nack
return;
}
+ _LOG_DEBUG("Interest : " << interest.getName());
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
- _LOG_DEBUG("Interest : " << interestPtr->getName());
+ if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ this,
+ interestPtr);
+ queryThread.join();
+ }
+ else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
- // @todo: use thread pool
- std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
- this,
- interestPtr);
- queryThread.join();
-}
+ auto data = m_cache.find(interest);
+ if (data) {
+ m_face->put(*data);
+ return;
+ }
-template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
-{
- // FIXME Results are currently getting served out of the forwarder's
- // CS so we just ignore any retrieval Interests that hit us for
- // now. In the future, this should check some form of
- // InMemoryStorage.
+ // catalog must strip sequence number in an Interest for further process
+ if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
+ // Interest carries sequence number, only grip the main part
+ // e.g., /hep/query/<query-params>/<version>/#seq
+ ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));
- _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
+ auto data = m_cache.find(queryInterest);
+ if (data) {
+ // catalog has generated some data, but still working on it
+ return;
+ }
+ interestPtr = queryInterest.shared_from_this();
+ }
- auto data = m_cache.find(interest.getName());
- if (data) {
- m_face->put(*data);
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
+ this,
+ interestPtr);
+ queryThread.join();
}
- _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
+ // ignore other Interests
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
{
_LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
- std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-
- _LOG_DEBUG("Interest : " << interestPtr->getName());
if(m_socket != nullptr) {
const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
@@ -539,15 +513,12 @@
}
}
- auto data = m_activeQueryToFirstResponse.find(interest.getName());
+ auto data = m_activeQueryToFirstResponse.find(*interest);
if (data) {
m_face->put(*data);
}
else {
- std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
- this,
- interestPtr);
- queryThread.join();
+ populateFiltersMenu(interest);
}
_LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
@@ -676,14 +647,9 @@
QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
const ndn::Name::Component& version)
{
- // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
- // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
-
- ndn::Name queryResultName(m_prefix);
- queryResultName.append("query-results")
- .append(m_catalogId)
- .append(interest->getName().get(-1))
- .append(version);
+ // use generic name, instead of specific one
+ ndn::Name queryResultName = interest->getName();
+ queryResultName.append(version);
return queryResultName;
}
@@ -989,19 +955,6 @@
version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
}
- // try to respond with the inMemoryStorage
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- auto data = m_activeQueryToFirstResponse.find(interest->getName());
- if (data) {
- _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
- m_face->put(*data);
- m_mutex.unlock();
- return;
- }
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
-
// 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
Json::Value parsedFromString;
Json::Reader reader;
@@ -1011,32 +964,12 @@
return;
}
- std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
-
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- // An unusual race-condition case, which requires things like PIT aggregation to be off.
- auto data = m_activeQueryToFirstResponse.find(interest->getName());
- if (data) {
- m_face->put(*data);
- m_mutex.unlock();
- return;
- }
-
- // This is where things are expensive so we save them for the lock
- // note that we ack the query with the cached ACK messages, but we should remove the ACKs
- // that conatin the old version when ChronoSync is updated
- m_activeQueryToFirstResponse.insert(*ack);
-
- m_face->put(*ack);
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
-
// 3) Convert the JSON Query into a MySQL one
bool autocomplete = false, lastComponent = false;
std::stringstream sqlQuery;
ndn::Name segmentPrefix(getQueryResultsName(interest, version));
+ _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
Json::Value tmp;
// expect the autocomplete and the component-based query are separate
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index c9252d3..88c8ea2 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -384,6 +384,7 @@
BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
}
+ // use real data instead of ack data
BOOST_AUTO_TEST_CASE(QueryAdapterMakeAckDataTest)
{
ndn::Interest interest(ndn::Name("/test/ack/data/json"));
@@ -396,9 +397,6 @@
std::shared_ptr<ndn::Data> data = queryAdapterTest2.getAckData(interestPtr, version);
BOOST_CHECK_EQUAL(data->getName().toUri(), "/test/ack/data/json");
- BOOST_CHECK_EQUAL(std::string(reinterpret_cast<const char*>(data->getContent().value()),
- data->getContent().value_size()),
- "/query-results/catalogIdPlaceHolder/json/%FD%01");
}
BOOST_AUTO_TEST_CASE(QueryAdapterMakeReplyDataTest1)
@@ -457,6 +455,7 @@
= std::make_shared<ndn::Interest>(ndn::Name("/test/query").append(jsonMessage.c_str()));
queryAdapterTest2.queryTest(queryInterest);
+
// TODO: the code below should be enabled when queryAdapter can get the correct the
// ChronoSync state; currently, we don't need the activeQuery to save the ACK data;
//auto ackData = queryAdapterTest2.getDataFromActiveQuery(jsonMessage);
@@ -470,12 +469,11 @@
// BOOST_CHECK_EQUAL(ackData->getContent().value_size(), 0);
//}
- std::shared_ptr<ndn::Interest> resultInterest
- = std::make_shared<ndn::Interest>(ndn::Name("/test/query-results"));
- auto replyData = queryAdapterTest2.getDataFromCache(*resultInterest);
+ // there is no query-results namespace data
+ auto replyData = queryAdapterTest2.getDataFromCache(*queryInterest);
BOOST_CHECK(replyData);
if (replyData){
- BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query-results"));
+ BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query"));
const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()),
replyData->getContent().value_size());
Json::Value parsedFromString;