catalog: initial query module
Change-Id: I0ddd22c5774d5e5d54c1ee80e0f01566fed311fd
refs: #2638
diff --git a/catalog/src/catalog/catalog.hpp b/catalog/src/catalog/catalog.hpp
new file mode 100644
index 0000000..fe680d9
--- /dev/null
+++ b/catalog/src/catalog/catalog.hpp
@@ -0,0 +1,92 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#ifndef ATMOS_CATALOG_CATALOG_HPP
+#define ATMOS_CATALOG_CATALOG_HPP
+
+#include "query/query-adapter.hpp"
+#include "publish/publish-adapter.hpp"
+
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+#include <memory>
+#include <string>
+
+namespace atmos {
+namespace catalog {
+
+/**
+ * The Catalog acts as a façade around the Database.
+ * It is Templated on a DatabaseHandler: the connection into the database that it will use to
+ * communicate with the actual system
+ */
+template <typename DatabaseHandler>
+class Catalog {
+public:
+ /**
+ * Constructor
+ *
+ * @param aFace: Face that will be used for NDN communications
+ * @param aKeyChain: KeyChain to sign query responses and evaluate the incoming publish
+ * and ChronoSync requests against
+ * @param aDatabaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ * @oaram aPrefix: Name that will define the prefix to all queries and publish requests
+ * that will be routed to this specific Catalog Instance
+ */
+ Catalog(std::shared_ptr<ndn::Face> aFace, std::shared_ptr<ndn::KeyChain> aKeyChain,
+ std::shared_ptr<DatabaseHandler> aDatabaseHandler, const ndn::Name& aPrefix);
+
+ /**
+ * Destructor
+ */
+ virtual
+ ~Catalog();
+
+protected:
+ // Templated Adapter to handle Query requests
+ atmos::query::QueryAdapter<DatabaseHandler> m_queryAdapter;
+ // Templated Adapter to handle Publisher requests
+ atmos::publish::PublishAdapter<DatabaseHandler> m_publishAdapter;
+}; // class Catalog
+
+template <typename DatabaseHandler>
+Catalog<DatabaseHandler>::Catalog(std::shared_ptr<ndn::Face> aFace,
+ std::shared_ptr<ndn::KeyChain> aKeyChain,
+ std::shared_ptr<DatabaseHandler> aDatabaseHandler,
+ const ndn::Name& aPrefix)
+
+ : m_queryAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
+ , m_publishAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
+{
+ // empty
+}
+
+template <typename DatabaseHandler>
+Catalog<DatabaseHandler>::~Catalog()
+{
+ // empty
+}
+
+} // namespace catalog
+} // namespace atmos
+
+#endif //ATMOS_CATALOG_CATALOG_HPP
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 6e2fae1..3f199ee 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -1,44 +1,51 @@
-/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
-/*
- * Copyright (c) 2015, Colorado State University.
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
*
- * This file is part of ndn-atmos.
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
*
- * ndn-atmos is free software: you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later version.
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
*
- * ndn-atmos is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- *
- * You should have received copies of the GNU General Public License and GNU Lesser
- * General Public License along with ndn-atmos, e.g., in COPYING.md file. If not, see
- * <http://www.gnu.org/licenses/>.
- *
- * See AUTHORS.md for complete list of ndn-atmos authors and contributors.
- */
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "catalog/catalog.hpp"
+#include "util/mysql-util.hpp"
#include <ChronoSync/socket.hpp>
-#include <ndn-cxx/face.hpp>
-#include <json/value.h>
-#include <mysql.h>
-using namespace std;
-using namespace ndn;
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+#include "mysql/mysql.h"
+
+#include <memory>
int main()
{
- Face face; // use ndn-cxx
- shared_ptr<chronosync::Socket> socket; // use ChronoSync
+ std::shared_ptr<chronosync::Socket> socket; // use ChronoSync
+ std::shared_ptr<ndn::Face> face(new ndn::Face());
+ std::shared_ptr<ndn::KeyChain> keyChain(new ndn::KeyChain());
- Json::Value root; // use jsoncpp
- MYSQL *con = mysql_init(NULL);
- if (con == NULL)
- {
- fprintf(stderr, "%s\n", mysql_error(con));
- return 1;
- }
+ // This should be unique to each instance
+ ndn::Name aName("/catalog/myUniqueName");
+
+ atmos::util::ConnectionDetails mysqlID("atmos-den.es.net", "testuser", "test623", "testdb");
+ std::shared_ptr<MYSQL> conn;
+ conn = atmos::util::MySQLConnectionSetup(mysqlID);
+
+ atmos::catalog::Catalog<MYSQL> catalog(face, keyChain, conn, aName);
+ face->processEvents();
return 0;
}
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
new file mode 100644
index 0000000..c6b322b
--- /dev/null
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -0,0 +1,139 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
+#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
+
+#include "util/catalog-adapter.hpp"
+#include "util/mysql-util.hpp"
+
+#include <json/reader.h>
+#include <json/value.h>
+#include <json/writer.h>
+
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/interest-filter.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+#include "mysql/mysql.h"
+
+#include <memory>
+#include <string>
+
+namespace atmos {
+namespace publish {
+
+/**
+ * PublishAdapter handles the Publish usecases for the catalog
+ */
+template <typename DatabaseHandler>
+class PublishAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+public:
+ /**
+ * Constructor
+ *
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain to sign query responses and evaluate the incoming publish
+ * and ChronoSync requests against
+ * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ * @oaram prefix: Name that will define the prefix to all publish requests
+ * that will be routed to this specific Catalog Instance
+ */
+ PublishAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+
+
+ /**
+ * Destructor
+ */
+ virtual
+ ~PublishAdapter();
+
+protected:
+ /**
+ * Initial "please publish this" Interests
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+ /**
+ * Data containing the actual thing we need to publish
+ *
+ * @param interest: Interest that caused this Data to be routed
+ * @param data: Data that needs to be handled
+ */
+ virtual void
+ onData(const ndn::Interest& interest, const ndn::Data& data);
+
+ // @todo: Should we do anything special with the timeouts for the publish requests?
+ // If so, overwrite onTimeout()
+
+};
+
+template <typename DatabaseHandler>
+PublishAdapter<DatabaseHandler>::PublishAdapter(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler,
+ const ndn::Name& prefix)
+ : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler,
+ ndn::Name(prefix).append("/publish"))
+{
+ face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("/publish")),
+ bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onInterest,
+ this, _1, _2),
+ bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+
+ std::shared_ptr<ndn::Interest> request(std::make_shared<ndn::Interest>(ndn::Name(prefix).append("/publish")));
+ atmos::util::CatalogAdapter<DatabaseHandler>::m_face->expressInterest(*request,
+ bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onData,
+ this, _1, _2),
+ bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onTimeout,
+ this, _1));
+}
+
+template <typename DatabaseHandler>
+PublishAdapter<DatabaseHandler>::~PublishAdapter()
+{
+ // empty
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+{
+ // @todo: Request the data for publish
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
+{
+ // @todo handle publishing the data
+}
+
+} // namespace publish
+} // namespace atmos
+#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
new file mode 100644
index 0000000..ca86d5e
--- /dev/null
+++ b/catalog/src/query/query-adapter.hpp
@@ -0,0 +1,395 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#ifndef ATMOS_QUERY_QUERY_ADAPTER_HPP
+#define ATMOS_QUERY_QUERY_ADAPTER_HPP
+
+#include "util/catalog-adapter.hpp"
+#include "util/mysql-util.hpp"
+
+#include <thread>
+
+
+#include <json/reader.h>
+#include <json/value.h>
+#include <json/writer.h>
+
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/interest-filter.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/time.hpp>
+#include <ndn-cxx/encoding/encoding-buffer.hpp>
+
+#include "mysql/mysql.h"
+
+#include <iostream>
+
+#include <map>
+#include <memory>
+#include <mutex>
+#include <sstream>
+#include <string>
+
+namespace atmos {
+namespace query {
+
+static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
+
+/**
+ * QueryAdapter handles the Query usecases for the catalog
+ */
+template <typename DatabaseHandler>
+class QueryAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+public:
+ /**
+ * Constructor
+ *
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
+ * and ChronoSync requests against
+ * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ * @param prefix: Name that will define the prefix to all queries requests that will be
+ * routed to this specific Catalog Instance
+ */
+ QueryAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+
+ /**
+ * Destructor
+ */
+ virtual
+ ~QueryAdapter();
+
+ /**
+ * Handles incoming query requests by stripping the filter off the Interest to get the
+ * actual request out. This removes the need for a 2-step Interest-Data retrieval.
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+ /**
+ * Handles requests for responses to an existing query
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+
+private:
+ /**
+ * Helper function that generates query results
+ *
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
+ * and ChronoSync requests against
+ * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ */
+ void
+ query(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<const ndn::Interest> interest,
+ std::shared_ptr<DatabaseHandler> databaseHandler);
+
+ /**
+ * Helper function that publishes JSON
+ *
+ * @param face: Face that will send the Data out on
+ * @param keyChain: KeyChain to sign the Data we're creating
+ * @param segmentPrefix: Name that identifies the Prefix for the Data
+ * @param value: Json::Value to be sent in the Data
+ * @param segmentNo: uint64_t the segment for this Data
+ * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
+ * @param isAutocomplete: bool to indicate whether this is an autocomplete message
+ */
+ void
+ publishJson(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ const ndn::Name& segmentPrefix, const Json::Value& value,
+ uint64_t segmentNo, bool isFinalBlock, bool isAutocomplete);
+
+ /**
+ * Helper function that publishes char*
+ *
+ * @param face: Face that will send the Data out on
+ * @param keyChain: KeyChain to sign the Data we're creating
+ * @param segmentPrefix: Name that identifies the Prefix for the Data
+ * @param payload: char* to be sent in the Data
+ * @param payloadLength: size_t to indicate how long payload is
+ * @param segmentNo: uint64_t the segment for this Data
+ * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
+ */
+ void
+ publishSegment(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ const ndn::Name& segmentPrefix, const char* payload, size_t payloadLength,
+ uint64_t segmentNo, bool isFinalBlock);
+
+ // mutex to control critical sections
+ std::mutex m_mutex;
+ // @{ needs m_mutex protection
+ // The Queries we are currently writing to
+ std::map<std::string, std::shared_ptr<ndn::Data>> m_activeQueryToFirstResponse;
+ // @}
+};
+
+
+template <typename DatabaseHandler>
+QueryAdapter<DatabaseHandler>::QueryAdapter(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler,
+ const ndn::Name& prefix)
+ : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler, prefix)
+{
+ face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query")),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryInterest,
+ this, _1, _2),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+
+ face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query-results")),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
+ this, _1, _2),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+}
+
+template <typename DatabaseHandler>
+QueryAdapter<DatabaseHandler>::~QueryAdapter(){
+ // empty
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+{
+ // strictly enforce query initialization namespace. Name should be our local prefix + "query" + parameters
+ if (interest.getName().size() != filter.getPrefix().size() + 1) {
+ // @todo: return a nack
+ return;
+ }
+
+ std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
+
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::query, this,
+ atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
+ atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
+ atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
+ queryThread.join();
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onQueryResultsInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
+{
+ // FIXME Results are currently getting served out of the forwarder's
+ // CS so we just ignore any retrieval Interests that hit us for
+ // now. In the future, this should check some form of
+ // InMemoryStorage.
+ std::cout << "Got to query result" << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::publishJson(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ const ndn::Name& segmentPrefix,
+ const Json::Value& value,
+ uint64_t segmentNo, bool isFinalBlock,
+ bool isAutocomplete)
+{
+ Json::Value entry;
+ Json::FastWriter fastWriter;
+ if (isAutocomplete) {
+ entry["next"] = value;
+ } else {
+ entry["results"] = value;
+ }
+ const std::string jsonMessage = fastWriter.write(entry);
+ publishSegment(face, keyChain, segmentPrefix, jsonMessage.c_str(), jsonMessage.size() + 1,
+ segmentNo, isFinalBlock);
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::publishSegment(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ const ndn::Name& segmentPrefix,
+ const char* payload, size_t payloadLength,
+ uint64_t segmentNo, bool isFinalBlock)
+{
+ ndn::Name segmentName(segmentPrefix);
+ if (isFinalBlock) {
+ segmentName.append("END");
+ } else {
+ segmentName.appendSegment(segmentNo);
+ }
+
+ std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+ data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
+ data->setFreshnessPeriod(ndn::time::milliseconds(10000));
+
+ if (isFinalBlock) {
+ data->setFinalBlockId(segmentName[-1]);
+ }
+ keyChain->sign(*data);
+ face->put(*data);
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::query(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<const ndn::Interest> interest,
+ std::shared_ptr<DatabaseHandler> databaseHandler)
+{
+ // @todo: we should return a NACK as we have no database
+}
+
+
+template <>
+void
+QueryAdapter<MYSQL>::query(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<const ndn::Interest> interest,
+ std::shared_ptr<MYSQL> databaseHandler)
+{
+ std::cout << "Running query" << std::endl;
+ // 1) Strip the prefix off the ndn::Interest's ndn::Name
+ // +1 to grab JSON component after "query" component
+ const std::string jsonQuery(reinterpret_cast<const char*>(interest->getName()[m_prefix.size()+1].value()));
+
+ // For efficiency, do a double check. Once without the lock, then with it.
+ if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
+ m_mutex.lock();
+ { // !!! BEGIN CRITICAL SECTION !!!
+ // If this fails upon locking, we removed it during our search.
+ // An unusual race-condition case, which requires things like PIT aggregation to be off.
+ auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
+ if (iter != m_activeQueryToFirstResponse.end()) {
+ face->put(*(iter->second));
+ m_mutex.unlock(); //escape lock
+ return;
+ }
+ } // !!! END CRITICAL SECTION !!!
+ m_mutex.unlock();
+ }
+
+ // 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
+ Json::Value parsedFromString;
+ Json::Reader reader;
+ if (reader.parse(jsonQuery, parsedFromString)) {
+ const ndn::name::Component version = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
+
+ // JSON parsed ok, so we can acknowledge successful receipt of the query
+ ndn::Name ackName(interest->getName());
+ ackName.append(version);
+ ackName.append("OK");
+
+ std::shared_ptr<ndn::Data> ack(std::make_shared<ndn::Data>(ackName));
+
+ m_mutex.lock();
+ { // !!! BEGIN CRITICAL SECTION !!!
+ // An unusual race-condition case, which requires things like PIT aggregation to be off.
+ auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
+ if (iter != m_activeQueryToFirstResponse.end()) {
+ face->put(*(iter->second));
+ m_mutex.unlock(); // escape lock
+ return;
+ }
+ // This is where things are expensive so we save them for the lock
+ keyChain->sign(*ack);
+ face->put(*ack);
+ m_activeQueryToFirstResponse.insert(std::pair<std::string,
+ std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+ } // !!! END CRITICAL SECTION !!!
+ m_mutex.unlock();
+
+ // 3) Convert the JSON Query into a MySQL one
+ bool autocomplete = false;
+ std::stringstream mysqlQuery;
+ mysqlQuery << "SELECT name FROM cmip5";
+ bool input = false;
+ for (Json::Value::iterator iter = parsedFromString.begin(); iter != parsedFromString.end(); ++iter)
+ {
+ Json::Value key = iter.key();
+ Json::Value value = (*iter);
+
+ if (input) {
+ mysqlQuery << " AND";
+ } else {
+ mysqlQuery << " WHERE";
+ }
+
+ // Auto-complete case
+ if (key.asString().compare("?") == 0) {
+ mysqlQuery << " name REGEXP '^" << value.asString() << "'";
+ autocomplete = true;
+ }
+ // Component case
+ else {
+ mysqlQuery << " " << key.asString() << "='" << value.asString() << "'";
+ }
+ input = true;
+ }
+
+ if (!input) { // Force it to be the empty set
+ mysqlQuery << " limit 0";
+ }
+ mysqlQuery << ";";
+
+ // 4) Run the Query
+ // We're assuming that databaseHandler has already been connected to the database
+ std::shared_ptr<MYSQL_RES> results = atmos::util::PerformQuery(databaseHandler, mysqlQuery.str());
+
+ MYSQL_ROW row;
+ ndn::Name segmentPrefix(m_prefix);
+ segmentPrefix.append("query-results");
+ segmentPrefix.append(version);
+
+ size_t usedBytes = 0;
+ const size_t PAYLOAD_LIMIT = 7000;
+ uint64_t segmentNo = 0;
+ Json::Value array;
+ while ((row = mysql_fetch_row(results.get())))
+ {
+ size_t size = strlen(row[0]) + 1;
+ if (usedBytes + size > PAYLOAD_LIMIT) {
+ publishJson(face, keyChain, segmentPrefix, array, segmentNo, false, autocomplete);
+ array.clear();
+ usedBytes = 0;
+ segmentNo++;
+ }
+ array.append(row[0]);
+ usedBytes += size;
+ }
+ publishJson(face, keyChain, segmentPrefix, array, segmentNo, true, autocomplete);
+ }
+}
+
+} // namespace query
+} // namespace atmos
+#endif //ATMOS_QUERY_QUERY_ADAPTER_HPP
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
new file mode 100644
index 0000000..52f21d2
--- /dev/null
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -0,0 +1,184 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#ifndef ATMOS_UTIL_CATALOG_ADAPTER_HPP
+#define ATMOS_UTIL_CATALOG_ADAPTER_HPP
+
+#include <ndn-cxx/data.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/interest.hpp>
+#include <ndn-cxx/name.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+
+#include <memory>
+#include <string>
+
+
+#include <ndn-cxx/encoding/block.hpp>
+
+#include <iostream>
+
+namespace atmos {
+namespace util {
+
+/**
+ * Catalog Adapter acts as a common interface for Classes that need to register as Interest Filters
+ *
+ * Both QueryAdapter and PublisherAdapter use this as a template to allow consistancy between
+ * their designs and flow-control
+ */
+template <typename DatabaseHandler>
+class CatalogAdapter {
+public:
+ /**
+ * Constructor
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain to sign query responses and evaluate the incoming publish
+ * and ChronoSync requests against
+ * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ * @oaram prefix: Name that will define the prefix to all queries and publish requests
+ * that will be routed to this specific Catalog Instance
+ */
+ CatalogAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+
+ /**
+ * Destructor
+ */
+ virtual
+ ~CatalogAdapter();
+
+protected:
+ // @{ (onData and onTimeout) and/or onInterest should be overwritten at a minimum
+
+
+ /**
+ * Data that is routed to this class based on the Interest
+ *
+ * @param interest: Interest that caused this Data to be routed
+ * @param data: Data that needs to be handled
+ */
+ virtual void
+ onData(const ndn::Interest& interest, const ndn::Data& data);
+
+ /**
+ * Timeout from a Data request
+ *
+ * @param interest: Interest that failed to be satisfied (within the timelimit)
+ */
+ virtual void
+ onTimeout(const ndn::Interest& interest);
+
+
+ /**
+ * Interest that is routed to this class based on the InterestFilter
+ *
+ * @param filter: InterestFilter that caused this Interest to be routed
+ * @param interest: Interest that needs to be handled
+ */
+ virtual void
+ onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ // @}
+
+ /**
+ * Callback that should/can be used to evaluate that the Interest Filter has been correctly set up
+ *
+ * @param prefix: Name that will be routed to this class
+ */
+ virtual void
+ onRegisterSuccess(const ndn::Name& prefix);
+
+ /**
+ * Callback that should/can be used to evaluate that the Interest Filter has been correctly set up
+ *
+ * @param prefix: Name that failed to route
+ * @param reason: String explanation as to why the failure occured
+ */
+ virtual void
+ onRegisterFailure(const ndn::Name& prefix, const std::string& reason);
+
+
+ // Face to communicate with
+ std::shared_ptr<ndn::Face> m_face;
+ // KeyChain used for security
+ std::shared_ptr<ndn::KeyChain> m_keyChain;
+ // Handle to the Catalog's database
+ std::shared_ptr<DatabaseHandler> m_databaseHandler;
+ // Prefix for our namespace
+ ndn::Name m_prefix;
+}; // class CatalogAdapter
+
+template <typename DatabaseHandler>
+CatalogAdapter<DatabaseHandler>::CatalogAdapter(std::shared_ptr<ndn::Face> face,
+ std::shared_ptr<ndn::KeyChain> keyChain,
+ std::shared_ptr<DatabaseHandler> databaseHandler,
+ const ndn::Name& prefix)
+ : m_face(face), m_keyChain(keyChain), m_databaseHandler(databaseHandler), m_prefix(prefix)
+{
+ // empty
+}
+
+template <typename DatabaseHandler>
+CatalogAdapter<DatabaseHandler>::~CatalogAdapter()
+{
+ // empty
+}
+
+template <typename DatabaseHandler>
+void
+CatalogAdapter<DatabaseHandler>::onRegisterSuccess(const ndn::Name& prefix)
+{
+ // std::cout << "Successfully registered " << prefix << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+CatalogAdapter<DatabaseHandler>::onRegisterFailure(const ndn::Name& prefix, const std::string& reason)
+{
+ // std::cout << "Failed to register prefix " << prefix << ": " << reason << std::endl;
+}
+
+
+template <typename DatabaseHandler>
+void
+CatalogAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
+{
+ // At this point we need to get the ndn::Block out of data.getContent()
+}
+
+template <typename DatabaseHandler>
+void
+CatalogAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+{
+ // At this point we need to use the filter to either:
+ // a) Request the Data for the Interest, or
+ // b) Use the Filter to ID where in the Interest the Interest's "Content" is, and grab that out
+}
+
+
+template <typename DatabaseHandler>
+void
+CatalogAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
+{
+ // At this point, probably should do a retry
+}
+
+} // namespace util
+} // namespace atmos
+
+#endif //ATMOS_UTIL_CATALOG_ADAPTER_HPP
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
new file mode 100644
index 0000000..62225ba
--- /dev/null
+++ b/catalog/src/util/mysql-util.cpp
@@ -0,0 +1,70 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "util/mysql-util.hpp"
+
+#include "mysql/errmsg.h"
+
+namespace atmos {
+namespace util {
+
+ConnectionDetails::ConnectionDetails(const std::string& serverInput, const std::string& userInput,
+ const std::string& passwordInput, const std::string& databaseInput)
+ : server(serverInput), user(userInput), password(passwordInput), database(databaseInput)
+{
+ // empty
+}
+
+
+std::shared_ptr<MYSQL>
+MySQLConnectionSetup(ConnectionDetails& details) {
+ MYSQL* conn = mysql_init(NULL);
+ mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
+ details.password.c_str(), details.database.c_str(), 0, NULL, 0);
+ std::shared_ptr<MYSQL> connection(conn);
+ return connection;
+}
+
+std::shared_ptr<MYSQL_RES>
+PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
+ std::shared_ptr<MYSQL_RES> result(NULL);
+
+ switch (mysql_query(connection.get(), sql_query.c_str()))
+ {
+ case 0:
+ {
+ MYSQL_RES* resultPtr = mysql_store_result(connection.get());
+ if (resultPtr != NULL)
+ {
+ result.reset(resultPtr);
+ }
+ break;
+ }
+ // Various error cases
+ case CR_COMMANDS_OUT_OF_SYNC:
+ case CR_SERVER_GONE_ERROR:
+ case CR_SERVER_LOST:
+ case CR_UNKNOWN_ERROR:
+ default:
+ break;
+ }
+ return result;
+}
+
+} // namespace util
+} // namespace atmos
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
new file mode 100644
index 0000000..da3c0b5
--- /dev/null
+++ b/catalog/src/util/mysql-util.hpp
@@ -0,0 +1,48 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#ifndef ATMOS_UTIL_CONNECTION_DETAILS_HPP
+#define ATMOS_UTIL_CONNECTION_DETAILS_HPP
+
+#include "mysql/mysql.h"
+
+#include <memory>
+#include <string>
+
+namespace atmos {
+namespace util {
+struct ConnectionDetails {
+public:
+ std::string server;
+ std::string user;
+ std::string password;
+ std::string database;
+
+ ConnectionDetails(const std::string& serverInput, const std::string& userInput,
+ const std::string& passwordInput, const std::string& databaseInput);
+};
+
+std::shared_ptr<MYSQL>
+MySQLConnectionSetup(ConnectionDetails& details);
+
+std::shared_ptr<MYSQL_RES>
+PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
+
+} // namespace util
+} // namespace atmos
+#endif //ATMOS_UTIL_CONNECTION_DETAILS_HPP
diff --git a/catalog/tests/unit-tests/simple.cpp b/catalog/tests/unit-tests/simple.cpp
index 970b835..5dda75a 100644
--- a/catalog/tests/unit-tests/simple.cpp
+++ b/catalog/tests/unit-tests/simple.cpp
@@ -1,32 +1,30 @@
-/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2015, Colorado State University.
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
*
- * This file is part of ndn-atmos.
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
*
- * ndn-atmos is free software: you can redistribute it and/or modify it under the
- * terms of the GNU Lesser General Public License as published by the Free Software
- * Foundation, either version 3 of the License, or (at your option) any later version.
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
*
- * ndn-atmos is distributed in the hope that it will be useful, but WITHOUT ANY
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- *
- * You should have received copies of the GNU General Public License and GNU Lesser
- * General Public License along with ndn-atmos, e.g., in COPYING.md file. If not, see
- * <http://www.gnu.org/licenses/>.
- *
- * See AUTHORS.md for complete list of ndn-atmos authors and contributors.
- */
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+**/
#include <boost/test/unit_test.hpp>
+
#include <json/value.h>
#include <json/writer.h>
#include <json/reader.h>
-#include <mysql.h>
+
#include <iostream>
-namespace NdnAtmos {
+namespace atmos {
namespace test {
BOOST_AUTO_TEST_SUITE(MasterSuite)
@@ -36,12 +34,6 @@
BOOST_CHECK(0==0);
}
-BOOST_AUTO_TEST_CASE(DBTest)
-{
- MYSQL *conn = mysql_init(NULL);
- BOOST_CHECK_EQUAL(conn == NULL, false);
-}
-
BOOST_AUTO_TEST_CASE(JsonTest)
{
Json::Value original;
@@ -62,4 +54,4 @@
BOOST_AUTO_TEST_SUITE_END()
} //namespace test
-} //namespace ndn-atmos
+} //namespace atmos