catalog: Transparent query routing implementation
Change-Id: Id174f42f21da4230d51df178803dc38ada3ce573
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index ab0b126..ee60c34 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -111,25 +111,15 @@
* @param interest: Interest that needs to be handled
*/
virtual void
- onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
-
- /**
- * Handles requests for responses to an existing query
- *
- * @param filter: InterestFilter that caused this Interest to be routed
- * @param interest: Interest that needs to be handled
- */
- virtual void
- onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onIncomingQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
* Handles requests for responses to an filter initialization request
*
- * @param filter: InterestFilter that caused this Interest to be routed
* @param interest: Interest that needs to be handled
*/
virtual void
- onFiltersInitializationInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest);
/**
* Helper function that generates query results from a Json query carried in the Interest
@@ -292,30 +282,8 @@
void
QueryAdapter<DatabaseHandler>::setFilters()
{
- ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
- m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
- bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
- this, _1, _2),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-
- ndn::Name queryResultsPrefix = ndn::Name(m_prefix).append("query-results");
- m_registeredPrefixList[queryResultsPrefix] =
- m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix)
- .append("query-results").append(m_catalogId)),
- bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
- this, _1, _2),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-
- ndn::Name filtersInitializationPrefix = ndn::Name(m_prefix).append("filters-initialization");
- m_registeredPrefixList[filtersInitializationPrefix] =
- m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("filters-initialization")),
- bind(&query::QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ m_registeredPrefixList[m_prefix] = m_face->setInterestFilter(ndn::InterestFilter(m_prefix),
+ bind(&query::QueryAdapter<DatabaseHandler>::onIncomingQueryInterest,
this, _1, _2),
bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
this, _1),
@@ -471,56 +439,62 @@
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onIncomingQueryInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
{
- _LOG_DEBUG(">> QueryAdapter::onQueryInterest");
+ _LOG_DEBUG(">> QueryAdapter::onIncomingQueryInterest");
- if (interest.getName().size() != filter.getPrefix().size() + 1) {
+ // Interest must carry component "initialization" or "query"
+ if (interest.getName().size() < filter.getPrefix().size()) {
// @todo: return a nack
return;
}
+ _LOG_DEBUG("Interest : " << interest.getName());
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
- _LOG_DEBUG("Interest : " << interestPtr->getName());
+ if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("filters-initialization")) {
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest,
+ this,
+ interestPtr);
+ queryThread.join();
+ }
+ else if (interest.getName()[filter.getPrefix().size()] == ndn::Name::Component("query")) {
- // @todo: use thread pool
- std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
- this,
- interestPtr);
- queryThread.join();
-}
+ auto data = m_cache.find(interest);
+ if (data) {
+ m_face->put(*data);
+ return;
+ }
-template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
-{
- // FIXME Results are currently getting served out of the forwarder's
- // CS so we just ignore any retrieval Interests that hit us for
- // now. In the future, this should check some form of
- // InMemoryStorage.
+ // catalog must strip sequence number in an Interest for further process
+ if (interest.getName().size() > (filter.getPrefix().size() + 2)) {
+ // Interest carries sequence number, only grip the main part
+ // e.g., /hep/query/<query-params>/<version>/#seq
+ ndn::Interest queryInterest(interest.getName().getPrefix(filter.getPrefix().size() + 2));
- _LOG_DEBUG(">> QueryAdapter::onQueryResultsInterest");
+ auto data = m_cache.find(queryInterest);
+ if (data) {
+ // catalog has generated some data, but still working on it
+ return;
+ }
+ interestPtr = queryInterest.shared_from_this();
+ }
- auto data = m_cache.find(interest.getName());
- if (data) {
- m_face->put(*data);
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
+ this,
+ interestPtr);
+ queryThread.join();
}
- _LOG_DEBUG("<< QueryAdapter::onQueryResultsInterest");
+ // ignore other Interests
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(const ndn::InterestFilter& filter,
- const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::onFiltersInitializationInterest(std::shared_ptr<const ndn::Interest> interest)
{
_LOG_DEBUG(">> QueryAdapter::onFiltersInitializationInterest");
- std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-
- _LOG_DEBUG("Interest : " << interestPtr->getName());
if(m_socket != nullptr) {
const ndn::ConstBufferPtr digestPtr = m_socket->getRootDigest();
@@ -539,15 +513,12 @@
}
}
- auto data = m_activeQueryToFirstResponse.find(interest.getName());
+ auto data = m_activeQueryToFirstResponse.find(*interest);
if (data) {
m_face->put(*data);
}
else {
- std::thread queryThread(&QueryAdapter<DatabaseHandler>::populateFiltersMenu,
- this,
- interestPtr);
- queryThread.join();
+ populateFiltersMenu(interest);
}
_LOG_DEBUG("<< QueryAdapter::onFiltersInitializationInterest");
@@ -676,14 +647,9 @@
QueryAdapter<DatabaseHandler>::getQueryResultsName(std::shared_ptr<const ndn::Interest> interest,
const ndn::Name::Component& version)
{
- // the server side should conform: http://redmine.named-data.net/projects/ndn-atmos/wiki/Query
- // for now, should be /<prefix>/query-results/<catalog-id>/<query-parameters>/<version>
-
- ndn::Name queryResultName(m_prefix);
- queryResultName.append("query-results")
- .append(m_catalogId)
- .append(interest->getName().get(-1))
- .append(version);
+ // use generic name, instead of specific one
+ ndn::Name queryResultName = interest->getName();
+ queryResultName.append(version);
return queryResultName;
}
@@ -989,19 +955,6 @@
version = ndn::name::Component::fromEscapedString(m_chronosyncDigest);
}
- // try to respond with the inMemoryStorage
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- auto data = m_activeQueryToFirstResponse.find(interest->getName());
- if (data) {
- _LOG_DEBUG("Answer with Data in IMS : " << data->getName());
- m_face->put(*data);
- m_mutex.unlock();
- return;
- }
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
-
// 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
Json::Value parsedFromString;
Json::Reader reader;
@@ -1011,32 +964,12 @@
return;
}
- std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
-
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- // An unusual race-condition case, which requires things like PIT aggregation to be off.
- auto data = m_activeQueryToFirstResponse.find(interest->getName());
- if (data) {
- m_face->put(*data);
- m_mutex.unlock();
- return;
- }
-
- // This is where things are expensive so we save them for the lock
- // note that we ack the query with the cached ACK messages, but we should remove the ACKs
- // that conatin the old version when ChronoSync is updated
- m_activeQueryToFirstResponse.insert(*ack);
-
- m_face->put(*ack);
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
-
// 3) Convert the JSON Query into a MySQL one
bool autocomplete = false, lastComponent = false;
std::stringstream sqlQuery;
ndn::Name segmentPrefix(getQueryResultsName(interest, version));
+ _LOG_DEBUG("segmentPrefix :" << segmentPrefix);
Json::Value tmp;
// expect the autocomplete and the component-based query are separate
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index c9252d3..88c8ea2 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -384,6 +384,7 @@
BOOST_CHECK_EQUAL(false, queryAdapterTest1.json2SqlTest(ss, testJson));
}
+ // use real data instead of ack data
BOOST_AUTO_TEST_CASE(QueryAdapterMakeAckDataTest)
{
ndn::Interest interest(ndn::Name("/test/ack/data/json"));
@@ -396,9 +397,6 @@
std::shared_ptr<ndn::Data> data = queryAdapterTest2.getAckData(interestPtr, version);
BOOST_CHECK_EQUAL(data->getName().toUri(), "/test/ack/data/json");
- BOOST_CHECK_EQUAL(std::string(reinterpret_cast<const char*>(data->getContent().value()),
- data->getContent().value_size()),
- "/query-results/catalogIdPlaceHolder/json/%FD%01");
}
BOOST_AUTO_TEST_CASE(QueryAdapterMakeReplyDataTest1)
@@ -457,6 +455,7 @@
= std::make_shared<ndn::Interest>(ndn::Name("/test/query").append(jsonMessage.c_str()));
queryAdapterTest2.queryTest(queryInterest);
+
// TODO: the code below should be enabled when queryAdapter can get the correct the
// ChronoSync state; currently, we don't need the activeQuery to save the ACK data;
//auto ackData = queryAdapterTest2.getDataFromActiveQuery(jsonMessage);
@@ -470,12 +469,11 @@
// BOOST_CHECK_EQUAL(ackData->getContent().value_size(), 0);
//}
- std::shared_ptr<ndn::Interest> resultInterest
- = std::make_shared<ndn::Interest>(ndn::Name("/test/query-results"));
- auto replyData = queryAdapterTest2.getDataFromCache(*resultInterest);
+ // there is no query-results namespace data
+ auto replyData = queryAdapterTest2.getDataFromCache(*queryInterest);
BOOST_CHECK(replyData);
if (replyData){
- BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query-results"));
+ BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query"));
const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()),
replyData->getContent().value_size());
Json::Value parsedFromString;
diff --git a/client/catalog-dev/js/catalog.js b/client/catalog-dev/js/catalog.js
index 2157d50..aaf5473 100644
--- a/client/catalog-dev/js/catalog.js
+++ b/client/catalog-dev/js/catalog.js
@@ -30,7 +30,7 @@
var regex = new RegExp("[\\?&]" + name + "=([^&#]*)"),
results = regex.exec(location.search);
return results === null ? "" : decodeURIComponent(results[1].replace(/\+/g, " "));
- }
+ };
//Overwrite config if present. Any failure will just cause this to be skipped.
try{
@@ -60,7 +60,7 @@
return (c=='x' ? r : (r&0x3|0x8)).toString(16);
});
return uuid;
- }
+ };
/**
* Atmos
@@ -86,6 +86,7 @@
this.config = config;
this.catalog = config['global']['catalogPrefix'];
+ this.catalogPrefix = new Name(this.catalog);
this.face = new Face(config['global']['faceConfig']);
@@ -193,7 +194,7 @@
scope.query(scope.catalog, {'??': path},
function(interest, data){
console.log("Search response", interest, data);
- scope.name = data.getContent().toString().replace(/[\n\0]+/g, '');
+ scope.name = interest.getName();
scope.getResults(0);
}, function(interest){
console.warn("Failed to retrieve final component.", interest, path);
@@ -206,7 +207,7 @@
callback(list.map(function(element){
return (path == "/"?"/":"") + element + "/";
}));
- })
+ });
});
$('#treeSearch').on('click', '.treeSearch', function(){
@@ -222,7 +223,7 @@
function(interest, data){ //Success
console.log("Tree search response", interest, data);
- scope.name = data.getContent().toString().replace(/[\n\0]+/g,'');
+ scope.name = interest.getName();
scope.getResults(0);
},
@@ -259,7 +260,7 @@
scope.request(null, filename);
});
- }
+ };
Atmos.prototype.clearResults = function(){
this.results = []; //Drop any old results.
@@ -267,7 +268,7 @@
this.resultCount = Infinity;
this.page = 0;
this.resultTable.empty();
- }
+ };
Atmos.prototype.pathSearch = function(){
var value = this.searchInput.val();
@@ -280,7 +281,7 @@
function(interest, data){
console.log("Query response:", interest, data);
- scope.name = data.getContent().toString().replace(/[\n\0]/g,"");
+ scope.name = interest.getName();
scope.getResults(0);
@@ -290,7 +291,7 @@
scope.createAlert("Request timed out. \"" + interest.getName().toUri() + "\" See console for details.");
});
- }
+ };
Atmos.prototype.search = function(){
@@ -308,7 +309,7 @@
function(interest, data){ //Response function
console.log("Query Response:", interest, data);
- scope.name = data.getContent().toString().replace(/[\n\0]/g,"");
+ scope.name = interest.getName();
scope.getResults(0);
@@ -317,7 +318,7 @@
scope.createAlert("Request failed after 3 attempts. \"" + interest.getName().toUri() + "\" See console for details.");
});
- }
+ };
Atmos.prototype.autoComplete = function(field, callback){
@@ -326,7 +327,7 @@
this.query(this.catalog, {"?": field},
function(interest, data){
- var name = new Name(data.getContent().toString().replace(/[\n\0]/g,""));
+ var name = interest.getName();
var interest = new Interest(name);
interest.setInterestLifetimeMilliseconds(1500);
@@ -359,7 +360,7 @@
scope.createAlert("Request failed after 3 attempts. \"" + interest.getName().toUri() + "\" See console for details.");
});
- }
+ };
Atmos.prototype.showResults = function(resultIndex) {
@@ -401,38 +402,42 @@
$.scrollTo("#results", 500, {interrupt: true});
- }
+ };
Atmos.prototype.getResults = function(index){
+ var scope = this;
if ($('#results').hasClass('hidden')){
$('#results').removeClass('hidden').slideDown();
}
- if ((this.results.length === this.resultCount) || (this.resultsPerPage * (index + 1) < this.results.length)){
+ if ((scope.results.length === scope.resultCount) || (scope.resultsPerPage * (index + 1) < scope.results.length)){
//console.log("We already have index", index);
- this.page = index;
- this.showResults(index);
+ scope.page = index;
+ scope.showResults(index);
return;
}
- if (this.name === null) {
+ if (scope.name === null) {
console.error("This shouldn't be reached! We are getting results before a search has occured!");
throw new Error("Illegal State");
}
- var first = new Name(this.name).appendSegment(this.retrievedSegments++);
+ var interestName = new Name(scope.name);
- console.log("Requesting data index: (", this.retrievedSegments - 1, ") at ", first.toUri());
+ // Interest name should be /<catalog-prefix>/query/<query-param>/<version>/<#seq>
+ if (scope.name.size() === (scope.catalogPrefix.size() + 3)) {
+ interestName = interestName.appendSegment(scope.retrievedSegments++);
+ console.log("Requesting data index: (", scope.retrievedSegments - 1, ") at ", interestName.toUri());
+ }
- var scope = this;
+ var interest = new Interest(interestName);
- var interest = new Interest(first)
interest.setInterestLifetimeMilliseconds(1500);
interest.setMustBeFresh(true);
async.retry(4, function(done){
- this.face.expressInterest(interest,
+ scope.face.expressInterest(interest,
function(interest, data){ //Response
if (data.getContent().length === 0){
@@ -463,6 +468,9 @@
scope.page = index;
+ // reset scope.name
+ scope.name = new Name(data.getName().getPrefix(scope.catalogPrefix.size() + 3));
+
scope.getResults(index); //Keep calling this until we have enough data.
done();
@@ -479,7 +487,7 @@
}
});
- }
+ };
Atmos.prototype.query = function(prefix, parameters, callback, timeout) {
@@ -508,7 +516,7 @@
}
});
- }
+ };
/**
* This function returns a map of all the categories active filters.
@@ -522,7 +530,7 @@
}, {}); //Collect a map<category, filter>.
//TODO Make the return value map<category, Array<filter>>
return filters;
- }
+ };
/**
* Creates a closable alert for the user.
@@ -538,7 +546,7 @@
alert.append(closeButton);
this.alerts.append(alert);
- }
+ };
/**
* Requests all of the names represented by the buttons in the elements list.
@@ -693,7 +701,7 @@
});
$('#request').modal(); //This forces the form to be the only option.
- }
+ };
}();
Atmos.prototype.filterSetup = function() {
@@ -760,7 +768,7 @@
ga('send', 'event', 'error', 'filters');
});
- }
+ };
/**
* This function retrieves all segments in order until it knows it has reached the last one.
@@ -803,7 +811,7 @@
}
});
- }
+ };
var handleData = function(interest, data){
@@ -820,11 +828,11 @@
request();
}
- }
+ };
request();
- }
+ };
Atmos.prototype.cleanRequestForm = function(){
$('#requestDest').prev().removeClass('btn-success').addClass('btn-default');
@@ -833,7 +841,7 @@
$('#subsetMenu').attr('class', 'collapse');
$('#subsetVariables').empty();
$('#request .alert').alert('close').remove();
- }
+ };
Atmos.prototype.setupRequestForm = function(){
@@ -879,7 +887,7 @@
addVariable('#locationTemplate');
});
- }
+ };
Atmos.prototype.getMetaData = (function(){
@@ -915,7 +923,7 @@
return ret;
- }
+ };
})();
return Atmos;
diff --git a/client/ndn-js b/client/ndn-js
index 84ae11d..0038d37 160000
--- a/client/ndn-js
+++ b/client/ndn-js
@@ -1 +1 @@
-Subproject commit 84ae11d023c41d05ba5b4b727d876a912e2ecded
+Subproject commit 0038d37252bd56ad26b3506cca5975bf93c89fc0
diff --git a/log4cxx.properties b/log4cxx.properties
index cdf2386..fc5d45c 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -16,6 +16,6 @@
log4j.appender.rootFileAppender.MaxFileSize=100KB
# Change the log level (INFO, DEBUG, etc.) here
-log4j.logger.QueryAdapter=INFO
-log4j.logger.PublishAdapter=INFO
+log4j.logger.QueryAdapter=DEBUG
+log4j.logger.PublishAdapter=DEBUG