catalog: implement catalog driver and facade
This commit also refactories the QueryAdapter's code, adds corresponding
unit-test. Catalog-adapter and catalog do not use template, so the definition
are moved to corresponding cpp files.
refs: #2599, #2600
Change-Id: I2be492ec3c2538e865bfa7c09ac8cd49e2a9527d
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index c6b322b..9d29434 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -31,50 +31,60 @@
#include <ndn-cxx/interest-filter.hpp>
#include <ndn-cxx/name.hpp>
#include <ndn-cxx/security/key-chain.hpp>
-
+#include <ndn-cxx/security/validator.hpp>
#include "mysql/mysql.h"
#include <memory>
#include <string>
+#include <vector>
+#include <unordered_map>
namespace atmos {
namespace publish {
-
/**
* PublishAdapter handles the Publish usecases for the catalog
*/
template <typename DatabaseHandler>
-class PublishAdapter : public atmos::util::CatalogAdapter<DatabaseHandler> {
+class PublishAdapter : public atmos::util::CatalogAdapter {
public:
/**
* Constructor
*
- * @param face: Face that will be used for NDN communications
- * @param keyChain: KeyChain to sign query responses and evaluate the incoming publish
- * and ChronoSync requests against
- * @param databaseHandler: <typename DatabaseHandler> to the database that stores our catalog
- * @oaram prefix: Name that will define the prefix to all publish requests
- * that will be routed to this specific Catalog Instance
+ * @param face: Face that will be used for NDN communications
+ * @param keyChain: KeyChain that will be used for data signing
*/
- PublishAdapter(std::shared_ptr<ndn::Face> face, std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler, const ndn::Name& prefix);
+ PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain);
-
- /**
- * Destructor
- */
virtual
~PublishAdapter();
+ /**
+ * Helper function that subscribe to a publish section for the config file
+ */
+ void
+ setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix);
+
protected:
/**
+ * Helper function that configures piblishAdapter instance according to publish section
+ * in config file
+ */
+ void
+ onConfig(const util::ConfigSection& section,
+ bool isDryDun,
+ const std::string& fileName,
+ const ndn::Name& prefix);
+
+ /**
* Initial "please publish this" Interests
*
* @param filter: InterestFilter that caused this Interest to be routed
* @param interest: Interest that needs to be handled
*/
virtual void
- onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
/**
* Data containing the actual thing we need to publish
@@ -83,53 +93,191 @@
* @param data: Data that needs to be handled
*/
virtual void
- onData(const ndn::Interest& interest, const ndn::Data& data);
+ onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
- // @todo: Should we do anything special with the timeouts for the publish requests?
- // If so, overwrite onTimeout()
+ /**
+ * Helper function to set the DatabaseHandler
+ */
+ void
+ setDatabaseHandler(const util::ConnectionDetails& databaseId);
+ /**
+ * Helper function that sets filters to make the adapter work
+ */
+ void
+ setFilters();
+
+protected:
+ typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
+ // Prefix for ChronoSync
+ ndn::Name m_syncPrefix;
+ // Handle to the Catalog's database
+ std::shared_ptr<DatabaseHandler> m_databaseHandler;
+ std::shared_ptr<ndn::Validator> m_validaor;
+ RegisteredPrefixList m_registeredPrefixList;
};
-template <typename DatabaseHandler>
-PublishAdapter<DatabaseHandler>::PublishAdapter(std::shared_ptr<ndn::Face> face,
- std::shared_ptr<ndn::KeyChain> keyChain,
- std::shared_ptr<DatabaseHandler> databaseHandler,
- const ndn::Name& prefix)
- : atmos::util::CatalogAdapter<DatabaseHandler>(face, keyChain, databaseHandler,
- ndn::Name(prefix).append("/publish"))
-{
- face->setInterestFilter(ndn::InterestFilter(ndn::Name(prefix).append("/publish")),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onInterest,
- this, _1, _2),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
- std::shared_ptr<ndn::Interest> request(std::make_shared<ndn::Interest>(ndn::Name(prefix).append("/publish")));
- atmos::util::CatalogAdapter<DatabaseHandler>::m_face->expressInterest(*request,
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onData,
- this, _1, _2),
- bind(&atmos::publish::PublishAdapter<DatabaseHandler>::onTimeout,
- this, _1));
+template <typename DatabaseHandler>
+PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
+ const std::shared_ptr<ndn::KeyChain>& keyChain)
+ : util::CatalogAdapter(face, keyChain)
+{
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setFilters()
+{
+ ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
+ m_registeredPrefixList[publishPrefix] = m_face->setInterestFilter(publishPrefix,
+ bind(&publish::PublishAdapter<DatabaseHandler>::onPublishInterest,
+ this, _1, _2),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
}
template <typename DatabaseHandler>
PublishAdapter<DatabaseHandler>::~PublishAdapter()
{
- // empty
+ for (const auto& itr : m_registeredPrefixList) {
+ if (static_cast<bool>(itr.second))
+ m_face->unsetInterestFilter(itr.second);
+ }
}
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest)
+PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
+ const ndn::Name& prefix)
+{
+ config.addSectionHandler("publishAdapter",
+ bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
+ _1, _2, _3, prefix));
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
+ bool isDryRun,
+ const std::string& filename,
+ const ndn::Name& prefix)
+{
+ using namespace util;
+ if (isDryRun) {
+ return;
+ }
+
+ std::string signingId, dbServer, dbName, dbUser, dbPasswd;
+ std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
+
+ for (auto item = section.begin();
+ item != section.end();
+ ++ item)
+ {
+ if (item->first == "signingId") {
+ signingId.assign(item->second.get_value<std::string>());
+ if (signingId.empty()) {
+ throw Error("Invalid value for \"signingId\""
+ " in \"publish\" section");
+ }
+ }
+
+ // @todo: parse the published_file_security section
+
+ else if (item->first == "database") {
+ const util::ConfigSection& databaseSection = item->second;
+ for (auto subItem = databaseSection.begin();
+ subItem != databaseSection.end();
+ ++ subItem) {
+ if (subItem->first == "dbServer") {
+ dbServer.assign(subItem->second.get_value<std::string>());
+ if (dbServer.empty()){
+ throw Error("Invalid value for \"dbServer\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbName") {
+ dbName.assign(subItem->second.get_value<std::string>());
+ if (dbName.empty()){
+ throw Error("Invalid value for \"dbName\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbUser") {
+ dbUser.assign(subItem->second.get_value<std::string>());
+ if (dbUser.empty()){
+ throw Error("Invalid value for \"dbUser\""
+ " in \"publish\" section");
+ }
+ }
+ if (subItem->first == "dbPasswd") {
+ dbPasswd.assign(subItem->second.get_value<std::string>());
+ if (dbPasswd.empty()){
+ throw Error("Invalid value for \"dbPasswd\""
+ " in \"publish\" section");
+ }
+ }
+ }
+ }
+ else if (item->first == "sync") {
+ const util::ConfigSection& synSection = item->second;
+ for (auto subItem = synSection.begin();
+ subItem != synSection.end();
+ ++ subItem) {
+ if (subItem->first == "prefix") {
+ syncPrefix.clear();
+ syncPrefix.assign(subItem->second.get_value<std::string>());
+ if (syncPrefix.empty()){
+ throw Error("Invalid value for \"prefix\""
+ " in \"publish\\sync\" section");
+ }
+ }
+ // todo: parse the sync_security section
+ }
+ }
+ }
+
+ m_prefix = prefix;
+ m_signingId = ndn::Name(signingId);
+ m_syncPrefix.clear();
+ m_syncPrefix.append(syncPrefix);
+ util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
+
+ setDatabaseHandler(mysqlId);
+ setFilters();
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+{
+ std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
+
+ m_databaseHandler = conn;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
+ const ndn::Interest& interest)
{
// @todo: Request the data for publish
}
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::onData(const ndn::Interest& interest, const ndn::Data& data)
+PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
+ const ndn::Data& data)
{
// @todo handle publishing the data
}