catalog: implement catalog driver and facade
This commit also refactories the QueryAdapter's code, adds corresponding
unit-test. Catalog-adapter and catalog do not use template, so the definition
are moved to corresponding cpp files.
refs: #2599, #2600
Change-Id: I2be492ec3c2538e865bfa7c09ac8cd49e2a9527d
diff --git a/catalog/src/catalog/catalog.cpp b/catalog/src/catalog/catalog.cpp
new file mode 100644
index 0000000..ebec2f9
--- /dev/null
+++ b/catalog/src/catalog/catalog.cpp
@@ -0,0 +1,102 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "catalog.hpp"
+
+namespace atmos {
+namespace catalog {
+
+Catalog::Catalog(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ const std::string& configFileName)
+ : m_face(face)
+ , m_keyChain(keyChain)
+ , m_configFile(configFileName)
+{
+ // empty
+}
+
+Catalog::~Catalog()
+{
+ // empty
+}
+
+void
+Catalog::onConfig(const util::ConfigSection& configSection,
+ bool isDryRun,
+ const std::string& fileName)
+{
+ if (isDryRun) {
+ return;
+ }
+ for (auto i = configSection.begin();
+ i != configSection.end();
+ ++ i)
+ {
+ if (i->first == "prefix") {
+ m_prefix.clear();
+ m_prefix.append(i->second.get_value<std::string>());
+ if (m_prefix.empty()) {
+ throw Error("Empty value for \"prefix\""
+ " in \"general\" section");
+ }
+ }
+ }
+}
+
+void
+Catalog::addAdapter(std::unique_ptr<util::CatalogAdapter>& adapter)
+{
+ m_adapters.push_back(std::move(adapter));
+}
+
+void
+Catalog::initializeCatalog()
+{
+ util::ConfigFile config(&util::ConfigFile::ignoreUnknownSection);
+
+ config.addSectionHandler("general", bind(&Catalog::onConfig, this, _1, _2, _3));
+
+ config.parse(m_configFile, true);
+ config.parse(m_configFile, false);
+}
+
+void
+Catalog::initializeAdapters()
+{
+ util::ConfigFile config(&util::ConfigFile::ignoreUnknownSection);
+ for (auto i = m_adapters.begin();
+ i != m_adapters.end();
+ ++ i)
+ {
+ (*i)->setConfigFile(config, m_prefix);
+ }
+
+ config.parse(m_configFile, true);
+ config.parse(m_configFile, false);
+}
+
+void
+Catalog::initialize()
+{
+ initializeCatalog();
+ initializeAdapters();
+}
+
+} // namespace catalog
+} // namespace atmos
diff --git a/catalog/src/catalog/catalog.hpp b/catalog/src/catalog/catalog.hpp
index fe680d9..b58129e 100644
--- a/catalog/src/catalog/catalog.hpp
+++ b/catalog/src/catalog/catalog.hpp
@@ -19,12 +19,10 @@
#ifndef ATMOS_CATALOG_CATALOG_HPP
#define ATMOS_CATALOG_CATALOG_HPP
-#include "query/query-adapter.hpp"
-#include "publish/publish-adapter.hpp"
+#include "util/catalog-adapter.hpp"
+#include "util/config-file.hpp"
-#include <ndn-cxx/data.hpp>
#include <ndn-cxx/face.hpp>
-#include <ndn-cxx/interest.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
@@ -36,55 +34,82 @@
/**
* The Catalog acts as a façade around the Database.
- * It is Templated on a DatabaseHandler: the connection into the database that it will use to
- * communicate with the actual system
*/
-template <typename DatabaseHandler>
class Catalog {
public:
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
/**
* Constructor
*
- * @param aFace: Face that will be used for NDN communications
- * @param aKeyChain: KeyChain to sign query responses and evaluate the incoming publish
- * and ChronoSync requests against
- * @param aDatabaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- * @oaram aPrefix: Name that will define the prefix to all queries and publish requests
- * that will be routed to this specific Catalog Instance
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
+ * @param configFileName: Configuration file that specifies the catalog configuration details
*/
- Catalog(std::shared_ptr<ndn::Face> aFace, std::shared_ptr<ndn::KeyChain> aKeyChain,
- std::shared_ptr<DatabaseHandler> aDatabaseHandler, const ndn::Name& aPrefix);
+ Catalog(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain,
+ const std::string& configFileName);
- /**
- * Destructor
- */
virtual
~Catalog();
+ /**
+ * Function that performs the initialization of catalog instance and the adapters added in the
+ * catalog. After initialization, face can be started by processEvents()
+ */
+ void
+ initialize();
+
+ /**
+ * Helper function that adds adapters in catalog so that all adapters can be initialized when
+ * the initialize() is called
+ *
+ * @param adapter: Adapter that will be added. Any adapter instances must be declared as the
+ * base Class "util::CatalogAdapter"
+ */
+ void
+ addAdapter(std::unique_ptr<util::CatalogAdapter>& adapter);
+
protected:
- // Templated Adapter to handle Query requests
- atmos::query::QueryAdapter<DatabaseHandler> m_queryAdapter;
- // Templated Adapter to handle Publisher requests
- atmos::publish::PublishAdapter<DatabaseHandler> m_publishAdapter;
+
+ /**
+ * Helper function that configures the catalog according to the general section
+ */
+ void
+ onConfig(const util::ConfigSection& configSection,
+ bool isDryRun,
+ const std::string& fileName);
+
+ /**
+ * Helper function that subscribes to the general section for the config file
+ */
+ void
+ initializeCatalog();
+
+ /**
+ * Helper function that launches the adapters configuration processing functions
+ */
+ void
+ initializeAdapters();
+
+private:
+ const std::shared_ptr<ndn::Face> m_face;
+ const std::shared_ptr<ndn::KeyChain> m_keyChain;
+ const std::string m_configFile;
+ ndn::Name m_prefix;
+
+ // Adapters that added by users
+ std::vector<std::unique_ptr<util::CatalogAdapter>> m_adapters;
}; // class Catalog
-template <typename DatabaseHandler>
-Catalog<DatabaseHandler>::Catalog(std::shared_ptr<ndn::Face> aFace,
- std::shared_ptr<ndn::KeyChain> aKeyChain,
- std::shared_ptr<DatabaseHandler> aDatabaseHandler,
- const ndn::Name& aPrefix)
-
- : m_queryAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
- , m_publishAdapter(aFace, aKeyChain, aDatabaseHandler, aPrefix)
-{
- // empty
-}
-
-template <typename DatabaseHandler>
-Catalog<DatabaseHandler>::~Catalog()
-{
- // empty
-}
} // namespace catalog
} // namespace atmos
diff --git a/catalog/src/main.cpp b/catalog/src/main.cpp
index 3f199ee..4c85d8c 100644
--- a/catalog/src/main.cpp
+++ b/catalog/src/main.cpp
@@ -14,37 +14,66 @@
*
* You should have received a copy of the GNU General Public License
* along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
-**/
+ **/
+#include "config.hpp"
#include "catalog/catalog.hpp"
-#include "util/mysql-util.hpp"
-
-#include <ChronoSync/socket.hpp>
-
-#include <ndn-cxx/data.hpp>
-#include <ndn-cxx/face.hpp>
-#include <ndn-cxx/interest.hpp>
-#include <ndn-cxx/name.hpp>
-#include <ndn-cxx/security/key-chain.hpp>
-
-#include "mysql/mysql.h"
+#include "query/query-adapter.hpp"
+#include "publish/publish-adapter.hpp"
#include <memory>
+#include <getopt.h>
+#include <ndn-cxx/face.hpp>
-int main()
+
+void
+usage()
{
- std::shared_ptr<chronosync::Socket> socket; // use ChronoSync
+ std::cout << "\n Usage:\n atmos-catalog "
+ "[-h] [-f config file] "
+ " [-f config file] - set the configuration file\n"
+ " [-h] - print help and exit\n"
+ "\n";
+}
+
+int
+main(int argc, char** argv)
+{
+ int option;
+ std::string configFile(DEFAULT_CONFIG_FILE);
+
+ while ((option = getopt(argc, argv, "f:h")) != -1) {
+ switch (option) {
+ case 'f':
+ configFile.assign(optarg);
+ break;
+ case 'h':
+ default:
+ usage();
+ return 0;
+ }
+ }
+
+ argc -= optind;
+ argv += optind;
+ if (argc != 0) {
+ usage();
+ return 1;
+ }
+
std::shared_ptr<ndn::Face> face(new ndn::Face());
std::shared_ptr<ndn::KeyChain> keyChain(new ndn::KeyChain());
- // This should be unique to each instance
- ndn::Name aName("/catalog/myUniqueName");
+ std::unique_ptr<atmos::util::CatalogAdapter>
+ queryAdapter(new atmos::query::QueryAdapter<MYSQL>(face, keyChain));
+ std::unique_ptr<atmos::util::CatalogAdapter>
+ publishAdapter(new atmos::publish::PublishAdapter<MYSQL>(face, keyChain));
- atmos::util::ConnectionDetails mysqlID("atmos-den.es.net", "testuser", "test623", "testdb");
- std::shared_ptr<MYSQL> conn;
- conn = atmos::util::MySQLConnectionSetup(mysqlID);
+ atmos::catalog::Catalog catalogInstance(face, keyChain, configFile);
+ catalogInstance.addAdapter(queryAdapter);
+ catalogInstance.addAdapter(publishAdapter);
- atmos::catalog::Catalog<MYSQL> catalog(face, keyChain, conn, aName);
+ catalogInstance.initialize();
face->processEvents();
return 0;
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index c6b322b..9d29434 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -31,50 +31,60 @@
#include <ndn-cxx/interest-filter.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
-
+#include <ndn-cxx/security/validator.hpp>
#include "mysql/mysql.h"
#include <memory>
#include <string>
+#include <vector>
+#include <unordered_map>
namespace atmos {
namespace publish {
-
/**
* PublishAdapter handles the Publish usecases for the catalog
*/
template <typename DatabaseHandler>
-class PublishAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+class PublishAdapter : public atmos::util::CatalogAdapter {
public:
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming publish
- * and ChronoSync requests against
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- * @oaram prefix: Name that will define the prefix to all publish requests
- * that will be routed to this specific Catalog Instance
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
*/
- PublishAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+ PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain);
-
- /**
- * Destructor
- */
virtual
~PublishAdapter();
+ /**
+ * Helper function that subscribe to a publish section for the config file
+ */
+ void
+ setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix);
+
protected:
/**
+ * Helper function that configures piblishAdapter instance according to publish section
+ * in config file
+ */
+ void
+ onConfig(const util::ConfigSection& section,
+ bool isDryDun,
+ const std::string& fileName,
+ const ndn::Name& prefix);
+
+ /**
* Initial "please publish this" Interests
*
* @param filter: InterestFilter that caused this Interest to be routed
* @param interest: Interest that needs to be handled
*/
virtual void
- onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
* Data containing the actual thing we need to publish
@@ -83,53 +93,191 @@
* @param data: Data that needs to be handled
*/
virtual void
- onData(const ndn::Interest& interest, const ndn::Data& data);
+ onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
- // @todo: Should we do anything special with the timeouts for the publish requests?
- // If so, overwrite onTimeout()
+ /**
+ * Helper function to set the DatabaseHandler
+ */
+ void
+ setDatabaseHandler(const util::ConnectionDetails& databaseId);
+ /**
+ * Helper function that sets filters to make the adapter work
+ */
+ void
+ setFilters();
+
+protected:
+ typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
+ // Prefix for ChronoSync
+ ndn::Name m_syncPrefix;
+ // Handle to the Catalog's database
+ std::shared_ptr<DatabaseHandler> m_databaseHandler;
+ std::shared_ptr<ndn::Validator> m_validaor;
+ RegisteredPrefixList m_registeredPrefixList;
};
-template <typename DatabaseHandler>
-PublishAdapter<DatabaseHandler>::PublishAdapter(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler,
- const ndn::Name& prefix)
- : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler,
- ndn::Name(prefix).append("/publish"))
-{
- face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("/publish")),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onInterest,
- this, _1, _2),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
- std::shared_ptr<ndn::Interest> request(std::make_shared<ndn::Interest>(ndn::Name(prefix).append("/publish")));
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face->expressInterest(*request,
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onData,
- this, _1, _2),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onTimeout,
- this, _1));
+template <typename DatabaseHandler>
+PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain)
+ : util::CatalogAdapter(face, keyChain)
+{
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setFilters()
+{
+ ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
+ m_registeredPrefixList[publishPrefix] = m_face->setInterestFilter(publishPrefix,
+ bind(&publish::PublishAdapter<DatabaseHandler>::onPublishInterest,
+ this, _1, _2),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
}
template <typename DatabaseHandler>
PublishAdapter<DatabaseHandler>::~PublishAdapter()
{
- // empty
+ for (const auto& itr : m_registeredPrefixList) {
+ if (static_cast<bool>(itr.second))
+ m_face->unsetInterestFilter(itr.second);
+ }
}
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix)
+{
+ config.addSectionHandler("publishAdapter",
+ bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
+ _1, _2, _3, prefix));
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
+ bool isDryRun,
+ const std::string& filename,
+ const ndn::Name& prefix)
+{
+ using namespace util;
+ if (isDryRun) {
+ return;
+ }
+
+ std::string signingId, dbServer, dbName, dbUser, dbPasswd;
+ std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
+
+ for (auto item = section.begin();
+ item != section.end();
+ ++ item)
+ {
+ if (item->first == "signingId") {
+ signingId.assign(item->second.get_value<std::string>());
+ if (signingId.empty()) {
+ throw Error("Invalid value for \"signingId\""
+ " in \"publish\" section");
+ }
+ }
+
+ // @todo: parse the published_file_security section
+
+ else if (item->first == "database") {
+ const util::ConfigSection& databaseSection = item->second;
+ for (auto subItem = databaseSection.begin();
+ subItem != databaseSection.end();
+ ++ subItem) {
+ if (subItem->first == "dbServer") {
+ dbServer.assign(subItem->second.get_value<std::string>());
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbName") {
+ dbName.assign(subItem->second.get_value<std::string>());
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbUser") {
+ dbUser.assign(subItem->second.get_value<std::string>());
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbPasswd") {
+ dbPasswd.assign(subItem->second.get_value<std::string>());
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"publish\" section");
+ }
+ }
+ }
+ }
+ else if (item->first == "sync") {
+ const util::ConfigSection& synSection = item->second;
+ for (auto subItem = synSection.begin();
+ subItem != synSection.end();
+ ++ subItem) {
+ if (subItem->first == "prefix") {
+ syncPrefix.clear();
+ syncPrefix.assign(subItem->second.get_value<std::string>());
+ if (syncPrefix.empty()){
+ throw Error("Invalid value for \"prefix\""
+ " in \"publish\\sync\" section");
+ }
+ }
+ // todo: parse the sync_security section
+ }
+ }
+ }
+
+ m_prefix = prefix;
+ m_signingId = ndn::Name(signingId);
+ m_syncPrefix.clear();
+ m_syncPrefix.append(syncPrefix);
+ util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+
+ setDatabaseHandler(mysqlId);
+ setFilters();
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+
+ m_databaseHandler = conn;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
{
// @todo: Request the data for publish
}
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
+PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
+ const ndn::Data& data)
{
// @todo handle publishing the data
}
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index bada073..107e6c4 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -21,10 +21,10 @@
#include "util/catalog-adapter.hpp"
#include "util/mysql-util.hpp"
+#include "util/config-file.hpp"
#include <thread>
-
#include <json/reader.h>
#include <json/value.h>
#include <json/writer.h>
@@ -42,6 +42,7 @@
#include "mysql/mysql.h"
#include <map>
+#include <unordered_map>
#include <memory>
#include <mutex>
#include <sstream>
@@ -49,35 +50,44 @@
namespace atmos {
namespace query {
-
static const size_t MAX_SEGMENT_SIZE = ndn::MAX_NDN_PACKET_SIZE >> 1;
/**
* QueryAdapter handles the Query usecases for the catalog
*/
template <typename DatabaseHandler>
-class QueryAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+class QueryAdapter : public atmos::util::CatalogAdapter {
public:
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
- * and ChronoSync requests against
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- * @param prefix: Name that will define the prefix to all queries requests that will be
- * routed to this specific Catalog Instance
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
*/
- QueryAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+ QueryAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain);
- /**
- * Destructor
- */
virtual
~QueryAdapter();
/**
+ * Helper function to specify section handler
+ */
+ void
+ setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix);
+
+protected:
+ /**
+ * Helper function for configuration parsing
+ */
+ void
+ onConfig(const util::ConfigSection& section,
+ bool isDryDun,
+ const std::string& fileName,
+ const ndn::Name& prefix);
+
+ /**
* Handles incoming query requests by stripping the filter off the Interest to get the
* actual request out. This removes the need for a 2-step Interest-Data retrieval.
*
@@ -96,67 +106,82 @@
virtual void
onQueryResultsInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
-private:
/**
- * Helper function that generates query results
+ * Helper function that makes query-results data
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
- * and ChronoSync requests against
- * @param interest: Interest that needs to be handled
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- */
- void
- query(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<const ndn::Interest> interest,
- std::shared_ptr<DatabaseHandler> databaseHandler);
-
- /**
- * Helper function that publishes JSON
- *
- * @param face: Face that will send the Data out on
- * @param keyChain: KeyChain to sign the Data we're creating
- * @param segmentPrefix: Name that identifies the Prefix for the Data
- * @param value: Json::Value to be sent in the Data
- * @param segmentNo: uint64_t the segment for this Data
- * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
+ * @param segmentPrefix: Name that identifies the Prefix for the Data
+ * @param value: Json::Value to be sent in the Data
+ * @param segmentNo: uint64_t the segment for this Data
+ * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the
+ * last entry
* @param isAutocomplete: bool to indicate whether this is an autocomplete message
*/
- void
- publishJson(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- const ndn::Name& segmentPrefix, const Json::Value& value,
- uint64_t segmentNo, bool isFinalBlock, bool isAutocomplete);
+ std::shared_ptr<ndn::Data>
+ makeReplyData(const ndn::Name& segmentPrefix,
+ const Json::Value& value,
+ uint64_t segmentNo,
+ bool isFinalBlock,
+ bool isAutocomplete);
/**
- * Helper function that publishes char*
+ * Helper function that generates query results from a Json query carried in the Interest
*
- * @param face: Face that will send the Data out on
- * @param keyChain: KeyChain to sign the Data we're creating
- * @param segmentPrefix: Name that identifies the Prefix for the Data
- * @param payload: char* to be sent in the Data
- * @param payloadLength: size_t to indicate how long payload is
- * @param segmentNo: uint64_t the segment for this Data
- * @param isFinalBlock: bool to indicate whether this needs to be flagged in the Data as the last entry
+ * @param interest: Interest that needs to be handled
*/
void
- publishSegment(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- const ndn::Name& segmentPrefix, const char* payload, size_t payloadLength,
- uint64_t segmentNo, bool isFinalBlock);
+ runJsonQuery(std::shared_ptr<const ndn::Interest> interest);
/**
- * Helper function that generates query results from a Json query
+ * Helper function that makes ACK data
*
- * @param face: Face that will be used for NDN communications
- * @param jsonQuery: String containing the JSON query
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming query
- * and ChronoSync requests against
- * @param interest: Interest that needs to be handled
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
+ * @param interest: Intersts that needs to be handled
+ * @param version: Version that needs to be in the data name
+ */
+ std::shared_ptr<ndn::Data>
+ makeAckData(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version);
+
+ /**
+ * Helper function that generates the sqlQuery string and autocomplete flag
+ * @param sqlQuery: stringstream to save the sqlQuery string
+ * @param jsonValue: Json value that contains the query information
+ * @param autocomplete: Flag to indicate if the json contains autocomplete flag
*/
void
- runJsonQuery(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<const ndn::Interest> interest, const std::string& jsonQuery,
- std::shared_ptr<DatabaseHandler> databaseHandler);
+ json2Sql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue,
+ bool& autocomplete);
+
+ /**
+ * Helper function that signs the data
+ */
+ void
+ signData(ndn::Data& data);
+
+ /**
+ * Helper function that publishes query-results data segments
+ */
+ virtual void
+ prepareSegments(const ndn::Name& segmentPrefix,
+ const std::string& sqlString,
+ bool autocomplete);
+
+ /**
+ * Helper function to set the DatabaseHandler
+ */
+ void
+ setDatabaseHandler(const util::ConnectionDetails& databaseId);
+
+ /**
+ * Helper function that set filters to make the adapter work
+ */
+ void
+ setFilters();
+
+protected:
+ typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
+ // Handle to the Catalog's database
+ std::shared_ptr<DatabaseHandler> m_databaseHandler;
// mutex to control critical sections
std::mutex m_mutex;
@@ -166,55 +191,162 @@
ndn::util::InMemoryStorageLru m_cache;
// @}
+ RegisteredPrefixList m_registeredPrefixList;
};
-
template <typename DatabaseHandler>
-QueryAdapter<DatabaseHandler>::QueryAdapter(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler,
- const ndn::Name& prefix)
- : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler, prefix)
- , m_cache(100000000)
+QueryAdapter<DatabaseHandler>::QueryAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain)
+ : util::CatalogAdapter(face, keyChain)
+ , m_cache(250000)
{
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query")),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryInterest,
- this, _1, _2),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("query-results")),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
- this, _1, _2),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&atmos::query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
-}
-
-template <typename DatabaseHandler>
-QueryAdapter<DatabaseHandler>::~QueryAdapter(){
- // empty
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+QueryAdapter<DatabaseHandler>::setFilters()
{
- // strictly enforce query initialization namespace. Name should be our local prefix + "query" + parameters
+ ndn::Name queryPrefix = ndn::Name(m_prefix).append("query");
+ m_registeredPrefixList[queryPrefix] = m_face->setInterestFilter(ndn::InterestFilter(queryPrefix),
+ bind(&query::QueryAdapter<DatabaseHandler>::onQueryInterest,
+ this, _1, _2),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+
+ ndn::Name resultPrefix = ndn::Name(m_prefix).append("query-results");
+ m_registeredPrefixList[resultPrefix] = m_face->setInterestFilter(ndn::InterestFilter(ndn::Name(m_prefix).append("query-results")),
+ bind(&query::QueryAdapter<DatabaseHandler>::onQueryResultsInterest,
+ this, _1, _2),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&query::QueryAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix)
+{
+ config.addSectionHandler("queryAdapter", bind(&QueryAdapter<DatabaseHandler>::onConfig, this,
+ _1, _2, _3, prefix));
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
+ bool isDryRun,
+ const std::string& filename,
+ const ndn::Name& prefix)
+{
+ using namespace util;
+ if (isDryRun) {
+ return;
+ }
+ std::string signingId, dbServer, dbName, dbUser, dbPasswd;
+ for (auto item = section.begin();
+ item != section.end();
+ ++ item)
+ {
+ if (item->first == "signingId") {
+ signingId.assign(item->second.get_value<std::string>());
+ if (signingId.empty()) {
+ throw Error("Empty value for \"signingId\""
+ " in \"query\" section");
+ }
+ }
+ if (item->first == "database") {
+ const util::ConfigSection& dataSection = item->second;
+ for (auto subItem = dataSection.begin();
+ subItem != dataSection.end();
+ ++ subItem)
+ {
+ if (subItem->first == "dbServer") {
+ dbServer.assign(subItem->second.get_value<std::string>());
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"query\" section");
+ }
+ }
+ if (subItem->first == "dbName") {
+ dbName.assign(subItem->second.get_value<std::string>());
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"query\" section");
+ }
+ }
+ if (subItem->first == "dbUser") {
+ dbUser.assign(subItem->second.get_value<std::string>());
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"query\" section");
+ }
+ }
+ if (subItem->first == "dbPasswd") {
+ dbPasswd.assign(subItem->second.get_value<std::string>());
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"query\" section");
+ }
+ }
+ }
+ }
+ }
+
+ m_prefix = prefix;
+ m_signingId = ndn::Name(signingId);
+ util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+
+ setDatabaseHandler(mysqlId);
+ setFilters();
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ //empty
+}
+
+template <>
+void
+QueryAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+
+ m_databaseHandler = conn;
+}
+
+template <typename DatabaseHandler>
+QueryAdapter<DatabaseHandler>::~QueryAdapter()
+{
+ for (const auto& itr : m_registeredPrefixList) {
+ if (static_cast<bool>(itr.second))
+ m_face->unsetInterestFilter(itr.second);
+ }
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::onQueryInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
+{
+ // strictly enforce query initialization namespace.
+ // Name should be our local prefix + "query" + parameters
if (interest.getName().size() != filter.getPrefix().size() + 1) {
// @todo: return a nack
return;
}
-
std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
-
- std::thread queryThread(&QueryAdapter<DatabaseHandler>::query, this,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
+ #ifndef NDEBUG
+ std::cout << "query interest : " << interestPtr->getName() << std::endl;
+ #endif
+ // @todo: use thread pool
+ std::thread queryThread(&QueryAdapter<DatabaseHandler>::runJsonQuery,
+ this,
+ interestPtr);
queryThread.join();
}
@@ -227,109 +359,100 @@
// CS so we just ignore any retrieval Interests that hit us for
// now. In the future, this should check some form of
// InMemoryStorage.
+ #ifndef NDEBUG
+ std::cout << "query results interest : " << interest.toUri() << std::endl;
+ #endif
auto data = m_cache.find(interest.getName());
if (data) {
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face->put(*data);
- } else {
- // regenerate query
- const std::string jsonQuery(reinterpret_cast<const char*>(interest.getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
-
- std::shared_ptr<const ndn::Interest> interestPtr = interest.shared_from_this();
- std::thread queryRegenThread(&QueryAdapter<DatabaseHandler>::runJsonQuery, this,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_keyChain, interestPtr,
- jsonQuery,
- atmos::util::CatalogAdapter<DatabaseHandler>::m_databaseHandler);
- queryRegenThread.join();
+ m_face->put(*data);
}
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::publishJson(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- const ndn::Name& segmentPrefix,
- const Json::Value& value,
- uint64_t segmentNo, bool isFinalBlock,
- bool isAutocomplete)
+QueryAdapter<DatabaseHandler>::signData(ndn::Data& data)
{
- Json::Value entry;
- Json::FastWriter fastWriter;
- if (isAutocomplete) {
- entry["next"] = value;
- } else {
- entry["results"] = value;
+ if (m_signingId.empty())
+ m_keyChain->sign(data);
+ else {
+ ndn::Name keyName = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
+ ndn::Name certName = m_keyChain->getDefaultCertificateNameForKey(keyName);
+ m_keyChain->sign(data, certName);
}
- const std::string jsonMessage = fastWriter.write(entry);
- publishSegment(face, keyChain, segmentPrefix, jsonMessage.c_str(), jsonMessage.size() + 1,
- segmentNo, isFinalBlock);
}
template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::publishSegment(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- const ndn::Name& segmentPrefix,
- const char* payload, size_t payloadLength,
- uint64_t segmentNo, bool isFinalBlock)
+std::shared_ptr<ndn::Data>
+QueryAdapter<DatabaseHandler>::makeAckData(std::shared_ptr<const ndn::Interest> interest,
+ const ndn::Name::Component& version)
{
- ndn::Name segmentName(segmentPrefix);
- if (isFinalBlock) {
- segmentName.append("END");
- } else {
- segmentName.appendSegment(segmentNo);
- }
+ // JSON parsed ok, so we can acknowledge successful receipt of the query
+ ndn::Name ackName(interest->getName());
+ ackName.append(version);
+ ackName.append("OK");
- std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
- data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
- data->setFreshnessPeriod(ndn::time::milliseconds(10000));
-
- if (isFinalBlock) {
- data->setFinalBlockId(segmentName[-1]);
- }
- keyChain->sign(*data);
- //face->put(*data);
-
- m_mutex.lock();
- m_cache.insert(*data);
- m_mutex.unlock();
+ std::shared_ptr<ndn::Data> ack = std::make_shared<ndn::Data>(ackName);
+ signData(*ack);
+ #ifndef NDEBUG
+ std::cout << "makeAckData : " << ackName << std::endl;
+ #endif
+ return ack;
}
template <typename DatabaseHandler>
void
-QueryAdapter<DatabaseHandler>::query(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<const ndn::Interest> interest,
- std::shared_ptr<DatabaseHandler> databaseHandler)
+QueryAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue,
+ bool& autocomplete)
+{
+ // 3) Convert the JSON Query into a MySQL one
+ sqlQuery << "SELECT name FROM cmip5";
+ bool input = false;
+ for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+ {
+ Json::Value key = iter.key();
+ Json::Value value = (*iter);
+
+ if (input) {
+ sqlQuery << " AND";
+ } else {
+ sqlQuery << " WHERE";
+ }
+
+ // Auto-complete case
+ if (key.asString().compare("?") == 0) {
+ sqlQuery << " name REGEXP '^" << value.asString() << "'";
+ autocomplete = true;
+ }
+ // Component case
+ else {
+ sqlQuery << " " << key.asString() << "='" << value.asString() << "'";
+ }
+ input = true;
+ }
+
+ if (!input) { // Force it to be the empty set
+ sqlQuery << " limit 0";
+ }
+ sqlQuery << ";";
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<const ndn::Interest> interest)
{
// 1) Strip the prefix off the ndn::Interest's ndn::Name
// +1 to grab JSON component after "query" component
- const std::string jsonQuery(reinterpret_cast<const char*>(interest->getName()[atmos::util::CatalogAdapter<DatabaseHandler>::m_prefix.size()+1].value()));
- if (jsonQuery.length() > 0) {
- runJsonQuery(face, keyChain, interest, jsonQuery, databaseHandler);
- } // else NACK?
-}
-template <typename DatabaseHandler>
-void
-QueryAdapter<DatabaseHandler>::runJsonQuery(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<const ndn::Interest> interest,
- const std::string& jsonQuery,
- std::shared_ptr<DatabaseHandler> databaseHandler)
-{
- // @todo: we should return a NACK as we have no database
-}
+ ndn::Name::Component jsonStr = interest->getName()[m_prefix.size()+1];
+ // This one cannot parse the JsonQuery correctly, and should be moved to runJsonQuery
+ const std::string jsonQuery(reinterpret_cast<const char*>(jsonStr.value()), jsonStr.value_size());
-
-template <>
-void
-QueryAdapter<MYSQL>::runJsonQuery(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<const ndn::Interest> interest,
- const std::string& jsonQuery,
- std::shared_ptr<MYSQL> databaseHandler)
-{
+ if (jsonQuery.length() <= 0) {
+ // send Nack?
+ return;
+ }
+ // ------------------
// For efficiency, do a double check. Once without the lock, then with it.
if (m_activeQueryToFirstResponse.find(jsonQuery) != m_activeQueryToFirstResponse.end()) {
m_mutex.lock();
@@ -338,7 +461,7 @@
// An unusual race-condition case, which requires things like PIT aggregation to be off.
auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
if (iter != m_activeQueryToFirstResponse.end()) {
- face->put(*(iter->second));
+ m_face->put(*(iter->second));
m_mutex.unlock(); //escape lock
return;
}
@@ -349,93 +472,147 @@
// 2) From the remainder of the ndn::Interest's ndn::Name, get the JSON out
Json::Value parsedFromString;
Json::Reader reader;
- if (reader.parse(jsonQuery, parsedFromString)) {
- const ndn::name::Component version = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
-
- // JSON parsed ok, so we can acknowledge successful receipt of the query
- ndn::Name ackName(interest->getName());
- ackName.append(version);
- ackName.append("OK");
-
- std::shared_ptr<ndn::Data> ack(std::make_shared<ndn::Data>(ackName));
-
- m_mutex.lock();
- { // !!! BEGIN CRITICAL SECTION !!!
- // An unusual race-condition case, which requires things like PIT aggregation to be off.
- auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
- if (iter != m_activeQueryToFirstResponse.end()) {
- face->put(*(iter->second));
- m_mutex.unlock(); // escape lock
- return;
- }
- // This is where things are expensive so we save them for the lock
- keyChain->sign(*ack);
- face->put(*ack);
- m_activeQueryToFirstResponse.insert(std::pair<std::string,
- std::shared_ptr<ndn::Data>>(jsonQuery, ack));
- } // !!! END CRITICAL SECTION !!!
- m_mutex.unlock();
-
- // 3) Convert the JSON Query into a MySQL one
- bool autocomplete = false;
- std::stringstream mysqlQuery;
- mysqlQuery << "SELECT name FROM cmip5";
- bool input = false;
- for (Json::Value::iterator iter = parsedFromString.begin(); iter != parsedFromString.end(); ++iter)
- {
- Json::Value key = iter.key();
- Json::Value value = (*iter);
-
- if (input) {
- mysqlQuery << " AND";
- } else {
- mysqlQuery << " WHERE";
- }
-
- // Auto-complete case
- if (key.asString().compare("?") == 0) {
- mysqlQuery << " name REGEXP '^" << value.asString() << "'";
- autocomplete = true;
- }
- // Component case
- else {
- mysqlQuery << " " << key.asString() << "='" << value.asString() << "'";
- }
- input = true;
- }
-
- if (!input) { // Force it to be the empty set
- mysqlQuery << " limit 0";
- }
- mysqlQuery << ";";
-
- // 4) Run the Query
- // We're assuming that databaseHandler has already been connected to the database
- std::shared_ptr<MYSQL_RES> results = atmos::util::PerformQuery(databaseHandler, mysqlQuery.str());
-
- MYSQL_ROW row;
- ndn::Name segmentPrefix(atmos::util::CatalogAdapter<MYSQL>::m_prefix);
- segmentPrefix.append("query-results");
- segmentPrefix.append(version);
-
- size_t usedBytes = 0;
- const size_t PAYLOAD_LIMIT = 7000;
- uint64_t segmentNo = 0;
- Json::Value array;
- while ((row = mysql_fetch_row(results.get())))
- {
- size_t size = strlen(row[0]) + 1;
- if (usedBytes + size > PAYLOAD_LIMIT) {
- publishJson(face, keyChain, segmentPrefix, array, segmentNo, false, autocomplete);
- array.clear();
- usedBytes = 0;
- segmentNo++;
- }
- array.append(row[0]);
- usedBytes += size;
- }
- publishJson(face, keyChain, segmentPrefix, array, segmentNo, true, autocomplete);
+ if (!reader.parse(jsonQuery, parsedFromString)) {
+ // @todo: send NACK?
+ std::cout << "cannot parse the JsonQuery" << std::endl;
+ return;
}
+
+ const ndn::name::Component version
+ = ndn::name::Component::fromVersion(ndn::time::toUnixTimestamp(
+ ndn::time::system_clock::now()).count());
+
+ std::shared_ptr<ndn::Data> ack = makeAckData(interest, version);
+
+ m_mutex.lock();
+ { // !!! BEGIN CRITICAL SECTION !!!
+ // An unusual race-condition case, which requires things like PIT aggregation to be off.
+ auto iter = m_activeQueryToFirstResponse.find(jsonQuery);
+ if (iter != m_activeQueryToFirstResponse.end()) {
+ m_face->put(*(iter->second));
+ m_mutex.unlock(); // escape lock
+ return;
+ }
+ // This is where things are expensive so we save them for the lock
+ m_activeQueryToFirstResponse.insert(std::pair<std::string,
+ std::shared_ptr<ndn::Data>>(jsonQuery, ack));
+ m_face->put(*ack);
+ } // !!! END CRITICAL SECTION !!!
+ m_mutex.unlock();
+
+ // 3) Convert the JSON Query into a MySQL one
+ bool autocomplete = false;
+ std::stringstream sqlQuery;
+ json2Sql(sqlQuery, parsedFromString, autocomplete);
+
+ // 4) Run the Query
+ ndn::Name segmentPrefix(m_prefix);
+ segmentPrefix.append("query-results");
+ segmentPrefix.append(version);
+
+ prepareSegments(segmentPrefix, sqlQuery.str(), autocomplete);
+}
+
+template <typename DatabaseHandler>
+void
+QueryAdapter<DatabaseHandler>::prepareSegments(const ndn::Name& segmentPrefix,
+ const std::string& sqlString,
+ bool autocomplete)
+{
+ // empty
+}
+
+// prepareSegments specilization function
+template<>
+void
+QueryAdapter<MYSQL>::prepareSegments(const ndn::Name& segmentPrefix,
+ const std::string& sqlString,
+ bool autocomplete)
+{
+#ifndef NDEBUG
+ std::cout << "sqlString in prepareSegments : " << sqlString << std::endl;
+#endif
+ // 4) Run the Query
+ std::shared_ptr<MYSQL_RES> results
+ = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
+
+ if (!results) {
+#ifndef NDEBUG
+ std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
+#endif
+ // @todo: throw runtime error or log the error message?
+ return;
+ }
+
+#ifndef NDEBUG
+ std::cout << "Query results for \""
+ << sqlString
+ << "\" contain "
+ << mysql_num_rows(results.get())
+ << " rows" << std::endl;
+#endif
+
+ MYSQL_ROW row;
+ size_t usedBytes = 0;
+ const size_t PAYLOAD_LIMIT = 7000;
+ uint64_t segmentNo = 0;
+ Json::Value array;
+ while ((row = mysql_fetch_row(results.get())))
+ {
+ size_t size = strlen(row[0]) + 1;
+ if (usedBytes + size > PAYLOAD_LIMIT) {
+ std::shared_ptr<ndn::Data> data
+ = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete);
+ m_mutex.lock();
+ m_cache.insert(*data);
+ m_mutex.unlock();
+ array.clear();
+ usedBytes = 0;
+ segmentNo++;
+ }
+ array.append(row[0]);
+ usedBytes += size;
+ }
+ std::shared_ptr<ndn::Data> data
+ = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete);
+ m_mutex.lock();
+ m_cache.insert(*data);
+ m_mutex.unlock();
+}
+
+template <typename DatabaseHandler>
+std::shared_ptr<ndn::Data>
+QueryAdapter<DatabaseHandler>::makeReplyData(const ndn::Name& segmentPrefix,
+ const Json::Value& value,
+ uint64_t segmentNo,
+ bool isFinalBlock,
+ bool isAutocomplete)
+{
+ Json::Value entry;
+ Json::FastWriter fastWriter;
+ if (isAutocomplete) {
+ entry["next"] = value;
+ } else {
+ entry["results"] = value;
+ }
+ const std::string jsonMessage = fastWriter.write(entry);
+ const char* payload = jsonMessage.c_str();
+ size_t payloadLength = jsonMessage.size() + 1;
+ ndn::Name segmentName(segmentPrefix);
+ segmentName.appendSegment(segmentNo);
+
+ std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
+ data->setContent(reinterpret_cast<const uint8_t*>(payload), payloadLength);
+ data->setFreshnessPeriod(ndn::time::milliseconds(10000));
+
+ if (isFinalBlock) {
+ data->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
+ }
+#ifndef NDEBUG
+ std::cout << "makeReplyData : " << segmentName << std::endl;
+#endif
+ signData(*data);
+ return data;
}
} // namespace query
diff --git a/catalog/src/util/catalog-adapter.cpp b/catalog/src/util/catalog-adapter.cpp
new file mode 100644
index 0000000..61404ad
--- /dev/null
+++ b/catalog/src/util/catalog-adapter.cpp
@@ -0,0 +1,57 @@
+/** NDN-Atmos: Cataloging Service for distributed data originally developed
+ * for atmospheric science data
+ * Copyright (C) 2015 Colorado State University
+ *
+ * NDN-Atmos is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * NDN-Atmos is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
+**/
+
+#include "catalog-adapter.hpp"
+
+namespace atmos {
+namespace util {
+
+CatalogAdapter::CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain)
+ : m_face(face)
+ , m_keyChain(keyChain)
+{
+ // empty
+}
+
+CatalogAdapter::~CatalogAdapter()
+{
+ // empty
+}
+
+void
+CatalogAdapter::onRegisterSuccess(const ndn::Name& prefix)
+{
+ // std::cout << "Successfully registered " << prefix << std::endl;
+}
+
+void
+CatalogAdapter::onRegisterFailure(const ndn::Name& prefix, const std::string& reason)
+{
+ throw Error("Failed to register prefix " + prefix.toUri() + " : " + reason);
+}
+
+void
+CatalogAdapter::onTimeout(const ndn::Interest& interest)
+{
+ // At this point, probably should do a retry
+}
+
+} // namespace util
+} // namespace atmos
+
diff --git a/catalog/src/util/catalog-adapter.hpp b/catalog/src/util/catalog-adapter.hpp
index 52f21d2..2b03358 100644
--- a/catalog/src/util/catalog-adapter.hpp
+++ b/catalog/src/util/catalog-adapter.hpp
@@ -24,14 +24,14 @@
#include <ndn-cxx/interest.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/encoding/block.hpp>
#include <memory>
#include <string>
-
-#include <ndn-cxx/encoding/block.hpp>
-
#include <iostream>
+#include "util/config-file.hpp"
+
namespace atmos {
namespace util {
@@ -42,39 +42,50 @@
* Both QueryAdapter and PublisherAdapter use this as a template to allow consistancy between
* their designs and flow-control
*/
-template <typename DatabaseHandler>
class CatalogAdapter {
public:
- /**
- * Constructor
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming publish
- * and ChronoSync requests against
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- * @oaram prefix: Name that will define the prefix to all queries and publish requests
- * that will be routed to this specific Catalog Instance
- */
- CatalogAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
/**
- * Destructor
+ * Constructor
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
*/
+ CatalogAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain);
+
virtual
~CatalogAdapter();
-protected:
- // @{ (onData and onTimeout) and/or onInterest should be overwritten at a minimum
-
-
/**
- * Data that is routed to this class based on the Interest
- *
- * @param interest: Interest that caused this Data to be routed
- * @param data: Data that needs to be handled
+ * Helper function that sets the configuration section handler
+ * @param config: ConfigFile object to set the handler
+ * @param prefix: Catalog prefix
*/
virtual void
- onData(const ndn::Interest& interest, const ndn::Data& data);
+ setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix) = 0;
+
+protected:
+
+ /**
+ * Callback function that handles the section parsing jobs
+ */
+ virtual void
+ onConfig(const util::ConfigSection& section,
+ bool isDryDun,
+ const std::string& fileName,
+ const ndn::Name& prefix) = 0;
+
+ // @{ (onData and onTimeout) and/or onInterest should be overwritten at a minimum
/**
* Timeout from a Data request
@@ -84,17 +95,6 @@
virtual void
onTimeout(const ndn::Interest& interest);
-
- /**
- * Interest that is routed to this class based on the InterestFilter
- *
- * @param filter: InterestFilter that caused this Interest to be routed
- * @param interest: Interest that needs to be handled
- */
- virtual void
- onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
- // @}
-
/**
* Callback that should/can be used to evaluate that the Interest Filter has been correctly set up
*
@@ -112,71 +112,16 @@
virtual void
onRegisterFailure(const ndn::Name& prefix, const std::string& reason);
-
+protected:
// Face to communicate with
- std::shared_ptr<ndn::Face> m_face;
- // KeyChain used for security
- std::shared_ptr<ndn::KeyChain> m_keyChain;
- // Handle to the Catalog's database
- std::shared_ptr<DatabaseHandler> m_databaseHandler;
- // Prefix for our namespace
+ const std::shared_ptr<ndn::Face> m_face;
+ // KeyChain used for data signing
+ const std::shared_ptr<ndn::KeyChain> m_keyChain;
ndn::Name m_prefix;
+ // Name for the signing key
+ ndn::Name m_signingId;
}; // class CatalogAdapter
-template <typename DatabaseHandler>
-CatalogAdapter<DatabaseHandler>::CatalogAdapter(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler,
- const ndn::Name& prefix)
- : m_face(face), m_keyChain(keyChain), m_databaseHandler(databaseHandler), m_prefix(prefix)
-{
- // empty
-}
-
-template <typename DatabaseHandler>
-CatalogAdapter<DatabaseHandler>::~CatalogAdapter()
-{
- // empty
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onRegisterSuccess(const ndn::Name& prefix)
-{
- // std::cout << "Successfully registered " << prefix << std::endl;
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onRegisterFailure(const ndn::Name& prefix, const std::string& reason)
-{
- // std::cout << "Failed to register prefix " << prefix << ": " << reason << std::endl;
-}
-
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
-{
- // At this point we need to get the ndn::Block out of data.getContent()
-}
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
-{
- // At this point we need to use the filter to either:
- // a) Request the Data for the Interest, or
- // b) Use the Filter to ID where in the Interest the Interest's "Content" is, and grab that out
-}
-
-
-template <typename DatabaseHandler>
-void
-CatalogAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
-{
- // At this point, probably should do a retry
-}
} // namespace util
} // namespace atmos
diff --git a/catalog/src/util/config-file.cpp b/catalog/src/util/config-file.cpp
new file mode 100644
index 0000000..0b335f9
--- /dev/null
+++ b/catalog/src/util/config-file.cpp
@@ -0,0 +1,140 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014 Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "config-file.hpp"
+
+#include <boost/property_tree/info_parser.hpp>
+#include <fstream>
+namespace atmos {
+namespace util {
+
+void
+ConfigFile::throwErrorOnUnknownSection(const std::string& filename,
+ const std::string& sectionName,
+ const ConfigSection& section,
+ bool isDryRun)
+{
+ std::string msg = "Error processing configuration file ";
+ msg += filename;
+ msg += ": no module subscribed for section \"" + sectionName + "\"";
+
+ throw ConfigFile::Error(msg);
+}
+
+void
+ConfigFile::ignoreUnknownSection(const std::string& filename,
+ const std::string& sectionName,
+ const ConfigSection& section,
+ bool isDryRun)
+{
+ // do nothing
+}
+
+ConfigFile::ConfigFile(UnknownConfigSectionHandler unknownSectionCallback)
+ : m_unknownSectionCallback(unknownSectionCallback)
+{
+}
+
+void
+ConfigFile::addSectionHandler(const std::string& sectionName,
+ ConfigSectionHandler subscriber)
+{
+ m_subscriptions[sectionName] = subscriber;
+}
+
+void
+ConfigFile::parse(const std::string& filename, bool isDryRun)
+{
+ std::ifstream inputFile;
+ inputFile.open(filename.c_str());
+ if (!inputFile.good() || !inputFile.is_open())
+ {
+ std::string msg = "Failed to read configuration file: ";
+ msg += filename;
+ throw Error(msg);
+ }
+ parse(inputFile, isDryRun, filename);
+ inputFile.close();
+}
+
+void
+ConfigFile::parse(const std::string& input, bool isDryRun, const std::string& filename)
+{
+ std::istringstream inputStream(input);
+ parse(inputStream, isDryRun, filename);
+}
+
+
+void
+ConfigFile::parse(std::istream& input, bool isDryRun, const std::string& filename)
+{
+ try
+ {
+ boost::property_tree::read_info(input, m_global);
+ }
+ catch (const boost::property_tree::info_parser_error& error)
+ {
+ std::stringstream msg;
+ msg << "Failed to parse configuration file";
+ msg << " " << filename;
+ msg << " " << error.message() << " line " << error.line();
+ throw Error(msg.str());
+ }
+
+ process(isDryRun, filename);
+}
+
+void
+ConfigFile::process(bool isDryRun, const std::string& filename)
+{
+ BOOST_ASSERT(!filename.empty());
+
+ if (m_global.begin() == m_global.end())
+ {
+ std::string msg = "Error processing configuration file: ";
+ msg += filename;
+ msg += " no data";
+ throw Error(msg);
+ }
+
+ for (ConfigSection::const_iterator i = m_global.begin(); i != m_global.end(); ++i)
+ {
+ const std::string& sectionName = i->first;
+ const ConfigSection& section = i->second;
+
+ SubscriptionTable::iterator subscriberIt = m_subscriptions.find(sectionName);
+ if (subscriberIt != m_subscriptions.end())
+ {
+ ConfigSectionHandler subscriber = subscriberIt->second;
+ subscriber(section, isDryRun, filename);
+ }
+ else
+ {
+ m_unknownSectionCallback(filename, sectionName, section, isDryRun);
+ }
+ }
+}
+
+} // util
+} // atmos
diff --git a/catalog/src/util/config-file.hpp b/catalog/src/util/config-file.hpp
new file mode 100644
index 0000000..6ce292a
--- /dev/null
+++ b/catalog/src/util/config-file.hpp
@@ -0,0 +1,128 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014 Regents of the University of California,
+ * Arizona Board of Regents,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University,
+ * Washington University in St. Louis,
+ * Beijing Institute of Technology
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef ATMOS_CATALOG_CONFIG_FILE_HPP
+#define ATMOS_CATALOG_CONFIG_FILE_HPP
+
+#include <boost/property_tree/ptree.hpp>
+#include <map>
+
+namespace atmos {
+namespace util{
+
+typedef boost::property_tree::ptree ConfigSection;
+
+/// \brief callback for config file sections
+typedef std::function<void(const ConfigSection& /*section*/,
+ bool /*isDryRun*/,
+ const std::string& /*filename*/)> ConfigSectionHandler;
+
+/// \brief callback for config file sections without a subscribed handler
+typedef std::function<void(const std::string& /*filename*/,
+ const std::string& /*sectionName*/,
+ const ConfigSection& /*section*/,
+ bool /*isDryRun*/)> UnknownConfigSectionHandler;
+
+class ConfigFile : boost::noncopyable
+{
+public:
+
+ class Error : public std::runtime_error
+ {
+ public:
+ explicit
+ Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+
+ }
+ };
+
+ ConfigFile(UnknownConfigSectionHandler unknownSectionCallback = throwErrorOnUnknownSection);
+
+ static void
+ throwErrorOnUnknownSection(const std::string& filename,
+ const std::string& sectionName,
+ const ConfigSection& section,
+ bool isDryRun);
+
+ static void
+ ignoreUnknownSection(const std::string& filename,
+ const std::string& sectionName,
+ const ConfigSection& section,
+ bool isDryRun);
+
+ /// \brief setup notification of configuration file sections
+ void
+ addSectionHandler(const std::string& sectionName,
+ ConfigSectionHandler subscriber);
+
+
+ /**
+ * \param filename file to parse
+ * \param isDryRun true if performing a dry run of configuration, false otherwise
+ * \throws ConfigFile::Error if file not found
+ * \throws ConfigFile::Error if parse error
+ */
+ void
+ parse(const std::string& filename, bool isDryRun);
+
+ /**
+ * \param input configuration (as a string) to parse
+ * \param isDryRun true if performing a dry run of configuration, false otherwise
+ * \param filename optional convenience argument to provide more detailed error messages
+ * \throws ConfigFile::Error if file not found
+ * \throws ConfigFile::Error if parse error
+ */
+ void
+ parse(const std::string& input, bool isDryRun, const std::string& filename);
+
+ /**
+ * \param input stream to parse
+ * \param isDryRun true if performing a dry run of configuration, false otherwise
+ * \param filename optional convenience argument to provide more detailed error messages
+ * \throws ConfigFile::Error if parse error
+ */
+ void
+ parse(std::istream& input, bool isDryRun, const std::string& filename);
+
+private:
+
+ void
+ process(bool isDryRun, const std::string& filename);
+
+private:
+ UnknownConfigSectionHandler m_unknownSectionCallback;
+
+ typedef std::map<std::string, ConfigSectionHandler> SubscriptionTable;
+
+ SubscriptionTable m_subscriptions;
+
+ ConfigSection m_global;
+};
+} // namespace util
+} // namespace atmos
+
+
+#endif // ATMOS_CATALOG_CONFIG_FILE_HPP
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
index 62225ba..fb58665 100644
--- a/catalog/src/util/mysql-util.cpp
+++ b/catalog/src/util/mysql-util.cpp
@@ -17,8 +17,8 @@
**/
#include "util/mysql-util.hpp"
-
-#include "mysql/errmsg.h"
+#include <mysql/errmsg.h>
+#include <stdexcept>
namespace atmos {
namespace util {
@@ -32,18 +32,18 @@
std::shared_ptr<MYSQL>
-MySQLConnectionSetup(ConnectionDetails& details) {
+MySQLConnectionSetup(const ConnectionDetails& details) {
MYSQL* conn = mysql_init(NULL);
- mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
- details.password.c_str(), details.database.c_str(), 0, NULL, 0);
- std::shared_ptr<MYSQL> connection(conn);
+ if(!mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
+ details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
+ throw std::runtime_error(mysql_error(conn));
+ }
+ std::shared_ptr<MYSQL> connection(conn, &mysql_close);
return connection;
}
std::shared_ptr<MYSQL_RES>
-PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
- std::shared_ptr<MYSQL_RES> result(NULL);
-
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
switch (mysql_query(connection.get(), sql_query.c_str()))
{
case 0:
@@ -51,7 +51,7 @@
MYSQL_RES* resultPtr = mysql_store_result(connection.get());
if (resultPtr != NULL)
{
- result.reset(resultPtr);
+ return std::shared_ptr<MYSQL_RES>(resultPtr, &mysql_free_result);
}
break;
}
@@ -63,7 +63,7 @@
default:
break;
}
- return result;
+ return nullptr;
}
} // namespace util
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
index da3c0b5..ee45f97 100644
--- a/catalog/src/util/mysql-util.hpp
+++ b/catalog/src/util/mysql-util.hpp
@@ -38,10 +38,10 @@
};
std::shared_ptr<MYSQL>
-MySQLConnectionSetup(ConnectionDetails& details);
+MySQLConnectionSetup(const ConnectionDetails& details);
std::shared_ptr<MYSQL_RES>
-PerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
} // namespace util
} // namespace atmos