catalog: Implement Sync Update Executor
Change-Id: I88a0c6935d7b8094dfb587e83270a607e753ea31
refs: #2611
diff --git a/catalog/src/publish/publish-adapter.hpp b/catalog/src/publish/publish-adapter.hpp
index 9fb03db..b954bc8 100644
--- a/catalog/src/publish/publish-adapter.hpp
+++ b/catalog/src/publish/publish-adapter.hpp
@@ -34,14 +34,23 @@
#include <ndn-cxx/security/key-chain.hpp>
#include <ndn-cxx/security/validator-config.hpp>
+#include <ChronoSync/socket.hpp>
#include <memory>
#include <string>
#include <vector>
#include <unordered_map>
+#include <mutex>
namespace atmos {
namespace publish {
+#define RETRY_WHEN_TIMEOUT 2
+// TODO: need to use the configured nameFields
+std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
+ "organization", "model", "experiment",
+ "frequency", "modeling_realm",
+ "variable_name", "ensemble", "time"}};
+
/**
* PublishAdapter handles the Publish usecases for the catalog
*/
@@ -88,6 +97,9 @@
virtual void
onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
+ virtual void
+ onTimeout(const ndn::Interest& interest);
+
/**
* Data containing the actual thing we need to publish
*
@@ -98,10 +110,10 @@
onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
/**
- * Helper function to set the DatabaseHandler
+ * Helper function to initialize the DatabaseHandler
*/
void
- setDatabaseHandler(const util::ConnectionDetails& databaseId);
+ initializeDatabase(const util::ConnectionDetails& databaseId);
/**
* Helper function that sets filters to make the adapter work
@@ -119,6 +131,88 @@
bool
validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
+
+ /**
+ * Helper function that processes the sync update
+ *
+ * @param updates: vector that contains all the missing data information
+ */
+ void
+ processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
+
+ /**
+ * Helper function that processes the update data
+ *
+ * @param data: shared pointer for the fetched update data
+ */
+ void
+ processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
+
+ /**
+ * Helper function that add data to or remove data from database
+ *
+ * @param sql: sql string to do the add or remove jobs
+ * @param op: enum value indicates the database operation, could be REMOVE, ADD
+ */
+ virtual void
+ operateDatabase(const std::string& sql,
+ util::DatabaseOperation op);
+
+ /**
+ * Helper function that parses jsonValue to generate sql string, return value indicates
+ * if it is successfully
+ *
+ * @param sqlString: streamstream to save the sql string
+ * @param jsonValue: Json value that contains the update information
+ * @param op: enum value indicates the database operation, could be REMOVE, ADD
+ */
+ bool
+ json2Sql(std::stringstream& sqlString,
+ Json::Value& jsonValue,
+ util::DatabaseOperation op);
+
+ /**
+ * Helper function to generate sql string based on file name, return value indicates
+ * if it is successfully
+ *
+ * @param sqlString: streamstream to save the sql string
+ * @param fileName: ndn uri string for a file name
+ */
+ bool
+ name2Fields(std::stringstream& sqlstring,
+ std::string& fileName);
+
+ /**
+ * Check the local database for the latest sequence number for a ChronoSync update
+ *
+ * @param update: the MissingDataInfo object
+ */
+ chronosync::SeqNo
+ getLatestSeqNo(const chronosync::MissingDataInfo& update);
+
+ /**
+ * Update the local database with the update message
+ *
+ * @param update: the MissingDataInfo object
+ */
+ void
+ renewUpdateInformation(const chronosync::MissingDataInfo& update);
+
+ /**
+ * Insert the update message into the local database
+ *
+ * @param update: the MissingDataInfo object
+ */
+ void
+ addUpdateInformation(const chronosync::MissingDataInfo& update);
+
+ void
+ onFetchUpdateDataTimeout(const ndn::Interest& interest);
+
+ void
+ onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+ const std::string& failureInfo);
+
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
// Prefix for ChronoSync
@@ -127,6 +221,13 @@
std::shared_ptr<DatabaseHandler> m_databaseHandler;
std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
RegisteredPrefixList m_registeredPrefixList;
+ std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
+ // mutex to control critical sections
+ std::mutex m_mutex;
+ std::vector<std::string> m_tableColumns;
+ // TODO: create thread for each request, and the variables below should be within the thread
+ bool m_mustBeFresh;
+ bool m_isFinished;
};
@@ -134,6 +235,8 @@
PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
const std::shared_ptr<ndn::KeyChain>& keyChain)
: util::CatalogAdapter(face, keyChain)
+ , m_mustBeFresh(true)
+ , m_isFinished(false)
{
}
@@ -142,14 +245,21 @@
PublishAdapter<DatabaseHandler>::setFilters()
{
ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
- m_registeredPrefixList[publishPrefix] =
- m_face->setInterestFilter(publishPrefix,
- bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
- this, _1, _2),
- bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
- this, _1),
- bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
- this, _1, _2));
+ m_registeredPrefixList[publishPrefix] =
+ m_face->setInterestFilter(publishPrefix,
+ bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
+ this, _1, _2),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
+ this, _1),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
+ this, _1, _2));
+
+ ndn::Name catalogSync = ndn::Name(m_prefix).append("sync");
+ m_socket.reset(new chronosync::Socket(m_syncPrefix,
+ catalogSync,
+ *m_face,
+ bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
+ this, _1)));
}
template <typename DatabaseHandler>
@@ -264,24 +374,69 @@
m_syncPrefix.append(syncPrefix);
util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
- setDatabaseHandler(mysqlId);
+ initializeDatabase(mysqlId);
setFilters();
}
template <typename DatabaseHandler>
void
-PublishAdapter<DatabaseHandler>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
{
//empty
}
template <>
void
-PublishAdapter<MYSQL>::setDatabaseHandler(const util::ConnectionDetails& databaseId)
+PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
{
std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
m_databaseHandler = conn;
+
+ if (m_databaseHandler != nullptr) {
+ std::string errMsg;
+ bool success = false;
+ // Ignore errors (when database already exists, errors are expected)
+ std::string createSyncTable =
+ "CREATE TABLE `chronosync_update_info` (\
+ `id` int(11) NOT NULL AUTO_INCREMENT, \
+ `session_name` varchar(1000) NOT NULL, \
+ `seq_num` int(11) NOT NULL, \
+ PRIMARY KEY (`id`), \
+ UNIQUE KEY `id_UNIQUE` (`id`) \
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
+
+ MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE, success, errMsg);
+ if (!success)
+ std::cout << errMsg << std::endl;
+
+ std::string createCmip5Table =
+ "CREATE TABLE `cmip5` ( \
+ `id` int(100) NOT NULL AUTO_INCREMENT, \
+ `sha256` varchar(64) NOT NULL, \
+ `name` varchar(1000) NOT NULL, \
+ `activity` varchar(100) NOT NULL, \
+ `product` varchar(100) NOT NULL, \
+ `organization` varchar(100) NOT NULL, \
+ `model` varchar(100) NOT NULL, \
+ `experiment` varchar(100) NOT NULL, \
+ `frequency` varchar(100) NOT NULL, \
+ `modeling_realm` varchar(100) NOT NULL, \
+ `variable_name` varchar(100) NOT NULL, \
+ `ensemble` varchar(100) NOT NULL, \
+ `time` varchar(100) NOT NULL, \
+ PRIMARY KEY (`id`), \
+ UNIQUE KEY `sha256` (`sha256`) \
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
+
+ success = false;
+ MySQLPerformQuery(m_databaseHandler, createCmip5Table, util::CREATE, success, errMsg);
+ if (!success)
+ std::cout << errMsg << std::endl;
+ }
+ else {
+ throw Error("cannot connect to the Database");
+ }
}
template <typename DatabaseHandler>
@@ -289,7 +444,39 @@
PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
const ndn::Interest& interest)
{
- // @todo: Request the data for publish
+ // Example Interest : /cmip5/publish/<uri>/<nonce>
+ std::cout << "Publish interest : " << interest.getName().toUri() << std::endl;
+
+ //send back ACK
+ char buf[4] = "ACK";
+ std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
+ data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
+ data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
+ m_keyChain->sign(*data);
+ m_face->put(*data);
+ std::cout << "Sent ACK for " << interest.getName() << std::endl;
+
+
+ //TODO: if already in catalog, what do we do?
+ //ask for content
+ ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
+ size_t m_nextSegment = 0;
+ std::shared_ptr<ndn::Interest> retrieveInterest =
+ std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
+ retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
+ retrieveInterest->setMustBeFresh(m_mustBeFresh);
+ m_face->expressInterest(*retrieveInterest,
+ bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
+ this,_1, _2),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
+ std::cout << "Expressing Interest for: " << retrieveInterest->toUri() << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
+{
+ std::cout << "interest " << interest.getName() << " timed out";
}
template <typename DatabaseHandler>
@@ -297,7 +484,363 @@
PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
const ndn::Data& data)
{
- // @todo handle data publication
+ std::cout << "received Data " << data.getName() << std::endl;
+ if (data.getContent().empty()) {
+ return;
+ }
+
+ std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
+ // validate published data payload, if failed, return
+ if (!validatePublicationChanges(dataPtr)) {
+ std::cout << "data validation failed : " << dataPtr->getName() << std::endl;
+#ifndef NDEBUG
+ const std::string payload(reinterpret_cast<const char*>(dataPtr->getContent().value()),
+ dataPtr->getContent().value_size());
+ std::cout << payload << std::endl;
+#endif
+ return;
+ }
+
+ // todo: return value to indicate if the insertion succeeds
+ processUpdateData(dataPtr);
+
+ // ideally, data should not be stale?
+ m_socket->publishData(data.getContent(), ndn::time::seconds(3600));
+
+ // if this is not the final block, continue to fetch the next one
+ const ndn::name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
+ if (finalBlockId == data.getName()[-1]) {
+ m_isFinished = true;
+ }
+ //else, get the next segment
+ if (!m_isFinished) {
+ ndn::Name nextInterestName = data.getName().getPrefix(-1);
+ uint64_t incomingSegment = data.getName()[-1].toSegment();
+ std::cout << " Next Interest Name " << nextInterestName << " Segment " << incomingSegment++
+ << std::endl;
+ std::shared_ptr<ndn::Interest> nextInterest =
+ std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment++));
+ nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
+ nextInterest->setMustBeFresh(m_mustBeFresh);
+ m_face->expressInterest(*nextInterest,
+ bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
+ this,_1, _2),
+ bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
+ this, _1));
+ }
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::processUpdateData(const ndn::shared_ptr<const ndn::Data>& data)
+{
+ const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
+ data->getContent().value_size());
+
+ if (payload.length() <= 0) {
+ return;
+ }
+
+ // the data payload must be JSON format
+ // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
+ Json::Value parsedFromPayload;
+ Json::Reader jsonReader;
+ if (!jsonReader.parse(payload, parsedFromPayload)) {
+ // todo: logging events
+ std::cout << "fail to parse the update data" << std::endl;
+ return;
+ }
+ else {
+ std::cout << "received Json format payload : "
+ << parsedFromPayload.toStyledString() << std::endl;
+ }
+ std::stringstream ss;
+ if (json2Sql(ss, parsedFromPayload, util::ADD)) {
+ std::cout << "sql string to insert data : " << ss.str() << std::endl;
+ // todo: before use, check if the connection is not NULL
+ // we may need to use lock here to ensure thread safe
+ operateDatabase(ss.str(), util::ADD);
+ }
+
+ ss.str("");
+ ss.clear();
+ if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
+ std::cout << "sql string to remove data: " << ss.str() << std::endl;
+ operateDatabase(ss.str(), util::REMOVE);
+ }
+}
+
+template <typename DatabaseHandler>
+chronosync::SeqNo
+PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
+{
+ // empty
+ return 0;
+}
+
+template <>
+chronosync::SeqNo
+PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
+{
+ std::string sql = "select seq_num from chronosync_update_info where session_name = '"
+ + update.session.toUri() + "';";
+ std::cout << "get latest seqNo : " << sql << std::endl;
+ std::string errMsg;
+ bool success;
+ std::shared_ptr<MYSQL_RES> results
+ = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
+ if (!success) {
+ std::cout << errMsg << std::endl;
+ return 0; //database connection error?
+ }
+ else if (results != nullptr){
+ MYSQL_ROW row;
+ if (mysql_num_rows(results.get()) == 0)
+ return 0;
+
+ while ((row = mysql_fetch_row(results.get())))
+ {
+ chronosync::SeqNo seqNo = std::stoull(row[0]);
+ return seqNo;
+ }
+ }
+ return 0;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+ //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+ std::string sql = "update chronosync_update_info set seq_num = "
+ + boost::lexical_cast<std::string>(update.high)
+ + " where session_name = '" + update.session.toUri() + "';";
+ std::cout << "renew update Info : " << sql << std::endl;
+ std::string errMsg;
+ bool success = false;
+ m_mutex.lock();
+ util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
+ m_mutex.unlock();
+ if (!success)
+ std::cout << errMsg << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+ //empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
+{
+ std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
+ + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
+ + ");";
+
+ std::cout << "add update Info : " << sql << std::endl;
+ std::string errMsg;
+ bool success = false;
+ m_mutex.lock();
+ util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
+ m_mutex.unlock();
+ if (!success)
+ std::cout << errMsg << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
+{
+ // todo: record event, and use recovery Interest to fetch the whole table
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
+ const std::string& failureInfo)
+{
+ std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
+ updates)
+{
+ if (updates.empty()) {
+ return;
+ }
+
+ // multiple updates from different catalog are possible
+ for (size_t i = 0; i < updates.size(); ++ i) {
+ // check if the session is in local DB
+ // if yes, only fetch packets whose seq number is bigger than the one in the DB
+ // if no, directly fetch Data
+ chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
+ bool update = false;
+
+ for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++ seq) {
+ if (seq > localSeqNo) {
+ m_socket->fetchData(updates[i].session, seq,
+ bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
+ bind(&PublishAdapter<DatabaseHandler>::onUpdateValidationFailed,
+ this, _1, _2),
+ bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
+ this, _1),
+ RETRY_WHEN_TIMEOUT);
+ std::cout << "send Interest for [" << updates[i].session << ":" << seq << "]" << std::endl;
+ update = true;
+ }
+ }
+ // update the seq session name and seq number in local DB
+ // indicating they are processed. So latter when this node reboots again, won't redo it
+ if (update) {
+ if (localSeqNo > 0)
+ renewUpdateInformation(updates[i]);
+ else
+ addUpdateInformation(updates[i]);
+ }
+ }
+}
+
+template <typename DatabaseHandler>
+void
+PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
+{
+ // empty
+}
+
+template <>
+void
+PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
+{
+ std::string errMsg;
+ bool success = false;
+ m_mutex.lock();
+ atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
+ m_mutex.unlock();
+ if (!success)
+ std::cout << errMsg << std::endl;
+}
+
+template<typename DatabaseHandler>
+bool
+PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
+ Json::Value& jsonValue,
+ util::DatabaseOperation op)
+{
+ if (jsonValue.type() != Json::objectValue) {
+ std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+ return false;
+ }
+ if (op == util::ADD) {
+ size_t updateNumber = jsonValue["add"].size();
+ if (updateNumber <= 0)
+ return false;
+
+ sqlString << "INSERT INTO cmip5 (";
+ for (size_t i = 0; i < atmosTableColumns.size(); ++ i) {
+ if (i != 0)
+ sqlString << ", ";
+ sqlString << atmosTableColumns[i];
+ }
+ sqlString << ") VALUES";
+
+ for (size_t i = 0; i < updateNumber; ++ i) { //parse each file name
+ if (i > 0)
+ sqlString << ",";
+ // cast might be overflowed
+ Json::Value item = jsonValue["add"][static_cast<int>(i)];
+ if (!item.isConvertibleTo(Json::stringValue)) {
+ std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
+ return false;
+ }
+ std::string fileName(item.asString());
+ // use digest sha256 for now, may be removed
+ ndn::util::Digest<CryptoPP::SHA256> digest;
+ digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
+
+ sqlString << "('" << digest.toString() << "','" << fileName << "'";
+
+ // parse the ndn name to get each value for each field
+ if (!name2Fields(sqlString, fileName))
+ return false;
+ sqlString << ")";
+ }
+ sqlString << ";";
+ }
+ else if (op == util::REMOVE) {
+ // remove files from db
+ size_t updateNumber = jsonValue["remove"].size();
+ if (updateNumber <= 0)
+ return false;
+
+ sqlString << "delete from cmip5 where name in (";
+ for (size_t i = 0; i < updateNumber; ++ i) {
+ if (i > 0)
+ sqlString << ",";
+ // cast might be overflowed
+ Json::Value item = jsonValue["remove"][static_cast<int>(i)];
+ if (!item.isConvertibleTo(Json::stringValue)) {
+ std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
+ return false;
+ }
+ std::string fileName(item.asString());
+
+ sqlString << "'" << fileName << "'";
+ }
+ sqlString << ");";
+ }
+ return true;
+}
+
+template<typename DatabaseHandler>
+bool
+PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
+ std::string& fileName)
+{
+ size_t start = 0;
+ size_t pos = 0;
+ size_t count = 0;
+ std::string token;
+ std::string delimiter = "/";
+ // fileName must starts with either ndn:/ or /
+ std::string nameWithNdn("ndn:/");
+ std::string nameWithSlash("/");
+ if (fileName.find(nameWithNdn) == 0) {
+ start = nameWithNdn.size();
+ }
+ else if (fileName.find(nameWithSlash) == 0) {
+ start = nameWithSlash.size();
+ }
+ else
+ return false;
+
+ while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
+ count ++;
+ token = fileName.substr(start, pos - start);
+ if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
+ return false; //fileName contains more than 10 fields
+ }
+ sqlString << ",'" << token << "'";
+ start = pos + 1;
+ }
+
+ // must be 10 fields in total (add the tail one)
+ if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
+ return false;
+ token = fileName.substr(start, std::string::npos - start);
+ sqlString << ",'" << token << "'";
+ return true;
}
template<typename DatabaseHandler>
diff --git a/catalog/src/query/query-adapter.hpp b/catalog/src/query/query-adapter.hpp
index 2adb161..ccfefb6 100644
--- a/catalog/src/query/query-adapter.hpp
+++ b/catalog/src/query/query-adapter.hpp
@@ -198,6 +198,10 @@
json2AutocompletionSql(std::stringstream& sqlQuery,
Json::Value& jsonValue);
+ bool
+ json2CompleteSearchSql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue);
+
protected:
typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
// Handle to the Catalog's database
@@ -439,9 +443,9 @@
nack->setFinalBlockId(ndn::Name::Component::fromSegment(segmentNo));
signData(*nack);
-
+#ifndef NDEBUG
std::cout << "make NACK : " << ndn::Name(dataPrefix).appendSegment(segmentNo) << std::endl;
-
+#endif
m_mutex.lock();
m_cache.insert(*nack);
m_mutex.unlock();
@@ -563,7 +567,86 @@
// 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
// return true
bool more = false;
- sqlQuery << "SELECT " << m_nameFields[count] << " FROM cmip5";
+ sqlQuery << "SELECT DISTINCT " << m_nameFields[count] << " FROM cmip5";
+ for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
+ it != typedComponents.end(); ++it) {
+ if (more)
+ sqlQuery << " AND";
+ else
+ sqlQuery << " WHERE";
+
+ sqlQuery << " " << it->first << "='" << it->second << "'";
+
+ more = true;
+ }
+ sqlQuery << ";";
+ return true;
+}
+
+template <typename DatabaseHandler>
+bool
+QueryAdapter<DatabaseHandler>::json2CompleteSearchSql(std::stringstream& sqlQuery,
+ Json::Value& jsonValue)
+{
+#ifndef NDEBUG
+ std::cout << "jsonValue in json2CompleteSearchSql: " << jsonValue.toStyledString() << std::endl;
+#endif
+ if (jsonValue.type() != Json::objectValue) {
+ std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
+ return false;
+ }
+
+ std::string typedString;
+ // get the string in the jsonValue
+ for (Json::Value::iterator iter = jsonValue.begin(); iter != jsonValue.end(); ++iter)
+ {
+ Json::Value key = iter.key();
+ Json::Value value = (*iter);
+
+ if (key == Json::nullValue || value == Json::nullValue) {
+ std::cout << "null key or value in JsonValue: " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ // cannot convert to string
+ if (!key.isConvertibleTo(Json::stringValue) || !value.isConvertibleTo(Json::stringValue)) {
+ std::cout << "malformed JsonQuery string : " << jsonValue.toStyledString() << std::endl;
+ return false;
+ }
+
+ if (key.asString().compare("??") == 0) {
+ typedString.assign(value.asString());
+ // since the front end triggers the autocompletion when users typed '/',
+ // there must be a '/' at the end, and the first char must be '/'
+ if (typedString.at(typedString.length() - 1) != '/' || typedString.find("/") != 0)
+ return false;
+ break;
+ }
+ }
+
+ // 1. get the expected column number by parsing the typedString, so we can get the filed name
+ size_t pos = 0;
+ size_t start = 1; // start from the 1st char which is not '/'
+ size_t count = 0; // also the name to query for
+ std::string token;
+ std::string delimiter = "/";
+ std::map<std::string, std::string> typedComponents;
+ while ((pos = typedString.find(delimiter, start)) != std::string::npos) {
+ token = typedString.substr(start, pos - start);
+ if (count >= m_nameFields.size() - 1) {
+ return false;
+ }
+
+ // add column name and value (token) into map
+ typedComponents.insert(std::pair<std::string, std::string>(m_nameFields[count], token));
+ count ++;
+ start = pos + 1;
+ }
+
+ // 2. generate the sql string (append what appears in the typed string, like activity='xxx'),
+ // return true
+ bool more = false;
+ sqlQuery << "SELECT name FROM cmip5";
for (std::map<std::string, std::string>::iterator it = typedComponents.begin();
it != typedComponents.end(); ++it) {
if (more)
@@ -668,6 +751,12 @@
return;
}
}
+ else if (parsedFromString.get("??", tmp) != tmp) {
+ if (!json2CompleteSearchSql(sqlQuery, parsedFromString)) {
+ sendNack(segmentPrefix);
+ return;
+ }
+ }
else {
if (!json2Sql(sqlQuery, parsedFromString)) {
sendNack(segmentPrefix);
@@ -695,11 +784,16 @@
const std::string& sqlString,
bool autocomplete)
{
+#ifndef NDEBUG
std::cout << "prepareSegments() executes sql : " << sqlString << std::endl;
-
+#endif
+ std::string errMsg;
+ bool success;
// 4) Run the Query
std::shared_ptr<MYSQL_RES> results
- = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString);
+ = atmos::util::MySQLPerformQuery(m_databaseHandler, sqlString, util::QUERY, success, errMsg);
+ if (!success)
+ std::cout << errMsg << std::endl;
if (!results) {
std::cout << "null MYSQL_RES for query : " << sqlString << std::endl;
@@ -717,27 +811,29 @@
<< " rows" << std::endl;
MYSQL_ROW row;
- size_t usedBytes = 0;
uint64_t segmentNo = 0;
- Json::Value array;
+ Json::Value tmp;
+ Json::Value resultJson;
+ Json::FastWriter fastWriter;
while ((row = mysql_fetch_row(results.get())))
{
- size_t size = strlen(row[0]) + 1;
- if (usedBytes + size > PAYLOAD_LIMIT) {
+ tmp.append(row[0]);
+ const std::string tmpString = fastWriter.write(tmp);
+ if (tmpString.length() > PAYLOAD_LIMIT) {
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, array, segmentNo, false, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, false, autocomplete, resultCount);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
- array.clear();
- usedBytes = 0;
- segmentNo++;
+ tmp.clear();
+ resultJson.clear();
+ segmentNo ++;
}
- array.append(row[0]);
- usedBytes += size;
+ resultJson.append(row[0]);
}
+
std::shared_ptr<ndn::Data> data
- = makeReplyData(segmentPrefix, array, segmentNo, true, autocomplete, resultCount);
+ = makeReplyData(segmentPrefix, resultJson, segmentNo, true, autocomplete, resultCount);
m_mutex.lock();
m_cache.insert(*data);
m_mutex.unlock();
diff --git a/catalog/src/util/mysql-util.cpp b/catalog/src/util/mysql-util.cpp
index fb58665..9df8b8e 100644
--- a/catalog/src/util/mysql-util.cpp
+++ b/catalog/src/util/mysql-util.cpp
@@ -19,6 +19,7 @@
#include "util/mysql-util.hpp"
#include <mysql/errmsg.h>
#include <stdexcept>
+#include <iostream>
namespace atmos {
namespace util {
@@ -32,10 +33,13 @@
std::shared_ptr<MYSQL>
-MySQLConnectionSetup(const ConnectionDetails& details) {
+MySQLConnectionSetup(const ConnectionDetails& details)
+{
MYSQL* conn = mysql_init(NULL);
+ my_bool reconnect = 1;
+ mysql_options(conn, MYSQL_OPT_RECONNECT, &reconnect);
if(!mysql_real_connect(conn, details.server.c_str(), details.user.c_str(),
- details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
+ details.password.c_str(), details.database.c_str(), 0, NULL, 0)) {
throw std::runtime_error(mysql_error(conn));
}
std::shared_ptr<MYSQL> connection(conn, &mysql_close);
@@ -43,25 +47,37 @@
}
std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query) {
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
+ const std::string& sql_query,
+ DatabaseOperation op,
+ bool& success,
+ std::string& errMsg)
+{
switch (mysql_query(connection.get(), sql_query.c_str()))
{
- case 0:
- {
+ case 0:
+ {
+ success = true;
+ if (op == QUERY) {
MYSQL_RES* resultPtr = mysql_store_result(connection.get());
if (resultPtr != NULL)
{
return std::shared_ptr<MYSQL_RES>(resultPtr, &mysql_free_result);
}
- break;
}
- // Various error cases
- case CR_COMMANDS_OUT_OF_SYNC:
- case CR_SERVER_GONE_ERROR:
- case CR_SERVER_LOST:
- case CR_UNKNOWN_ERROR:
- default:
- break;
+ //for add, remove, we don't need the results, may be log the events
+ break;
+ }
+ // Various error cases
+ case CR_COMMANDS_OUT_OF_SYNC:
+ case CR_SERVER_GONE_ERROR:
+ case CR_SERVER_LOST:
+ case CR_UNKNOWN_ERROR:
+ default:
+ {
+ errMsg.assign(mysql_error(connection.get()));
+ break;
+ }
}
return nullptr;
}
diff --git a/catalog/src/util/mysql-util.hpp b/catalog/src/util/mysql-util.hpp
index ee45f97..75e5997 100644
--- a/catalog/src/util/mysql-util.hpp
+++ b/catalog/src/util/mysql-util.hpp
@@ -26,6 +26,7 @@
namespace atmos {
namespace util {
+enum DatabaseOperation {CREATE, UPDATE, ADD, REMOVE, QUERY};
struct ConnectionDetails {
public:
std::string server;
@@ -41,7 +42,11 @@
MySQLConnectionSetup(const ConnectionDetails& details);
std::shared_ptr<MYSQL_RES>
-MySQLPerformQuery(std::shared_ptr<MYSQL> connection, const std::string& sql_query);
+MySQLPerformQuery(std::shared_ptr<MYSQL> connection,
+ const std::string& sql_query,
+ DatabaseOperation op,
+ bool& success,
+ std::string& errMsg);
} // namespace util
} // namespace atmos
diff --git a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
index 7d6a894..90c6f3f 100644
--- a/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
+++ b/catalog/tests/unit-tests/publish/test-publish-adapter.cpp
@@ -45,6 +45,11 @@
{
}
+ void setTableFields(const std::vector<std::string>& tableFields)
+ {
+ m_tableColumns = tableFields;
+ }
+
const ndn::Name
getPrefix()
{
@@ -71,6 +76,21 @@
}
bool
+ testJson2Sql(std::stringstream& sqlString,
+ Json::Value& jsonValue,
+ util::DatabaseOperation operation)
+ {
+ return json2Sql(sqlString, jsonValue, operation);
+ }
+
+ bool
+ testName2Fields(std::stringstream& sqlString,
+ std::string& fileName)
+ {
+ return name2Fields(sqlString, fileName);
+ }
+
+ bool
testValidatePublicationChanges(const std::shared_ptr<const ndn::Data>& data)
{
return validatePublicationChanges(data);
@@ -86,6 +106,21 @@
, publishAdapterTest1(face, keyChain)
, publishAdapterTest2(face, keyChain)
{
+ std::string cx("sha256"), c0("name"), c1("activity"), c2("product"), c3("organization");
+ std::string c4("model"), c5("experiment"), c6("frequency"), c7("modeling_realm");
+ std::string c8("variable_name"), c9("ensemble"), c10("time");
+ tableFields.push_back(cx);
+ tableFields.push_back(c0);
+ tableFields.push_back(c1);
+ tableFields.push_back(c2);
+ tableFields.push_back(c3);
+ tableFields.push_back(c4);
+ tableFields.push_back(c5);
+ tableFields.push_back(c6);
+ tableFields.push_back(c7);
+ tableFields.push_back(c8);
+ tableFields.push_back(c9);
+ tableFields.push_back(c10);
}
virtual
@@ -116,6 +151,7 @@
catch (boost::property_tree::info_parser_error &e) {
std::cout << "Failed to read config file " << e.what() << std::endl;
}
+ publishAdapterTest1.setTableFields(tableFields);
publishAdapterTest1.configAdapter(section, ndn::Name("/test"));
}
@@ -142,6 +178,7 @@
catch (boost::property_tree::info_parser_error &e) {
std::cout << "Failed to read config file " << e.what() << std::endl;;
}
+ publishAdapterTest2.setTableFields(tableFields);
publishAdapterTest2.configAdapter(section, ndn::Name("/test"));
}
@@ -150,6 +187,7 @@
std::shared_ptr<ndn::KeyChain> keyChain;
PublishAdapterTest publishAdapterTest1;
PublishAdapterTest publishAdapterTest2;
+ std::vector<std::string> tableFields;
};
BOOST_FIXTURE_TEST_SUITE(PublishAdapterTestSuite, PublishAdapterFixture)
@@ -175,6 +213,76 @@
ndn::Name("ndn:/ndn-atmos/broadcast/chronosync"));
}
+ BOOST_AUTO_TEST_CASE(PublishAdapterName2FieldsNormalTest)
+ {
+ std::string testFileName1 = "/1/2/3/4/5/6/7/8/9/10";
+ std::stringstream ss;
+ std::string expectString1 = ",'1','2','3','4','5','6','7','8','9','10'";
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName1), true);
+ BOOST_CHECK_EQUAL(ss.str(), expectString1);
+
+ ss.str("");
+ ss.clear();
+ std::string testFileName2 = "ndn:/1/2/3/4/5/6/777/8/99999/10";
+ std::string expectString2 = ",'1','2','3','4','5','6','777','8','99999','10'";
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName2), true);
+ BOOST_CHECK_EQUAL(ss.str(), expectString2);
+ }
+
+ BOOST_AUTO_TEST_CASE(PublishAdapterName2FieldsFailureTest)
+ {
+ std::string testFileName1 = "/1/2/3/4/5/6/7/8/9/10/11";//too much components
+ std::stringstream ss;
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName1), false);
+
+ ss.str("");
+ ss.clear();
+ std::string testFileName2 = "1234567890";
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName2), false);
+
+ ss.str("");
+ ss.clear();
+ std::string testFileName3 = "ndn:/1/2/3/4/5"; //too little components
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testName2Fields(ss, testFileName3), false);
+ }
+
+ BOOST_AUTO_TEST_CASE(PublishAdapterSqlStringNormalTest)
+ {
+ Json::Value testJson;
+ testJson["add"][0] = "/1/2/3/4/5/6/7/8/9/10";
+ testJson["add"][1] = "ndn:/a/b/c/d/eee/f/gg/h/iiii/j";
+ testJson["remove"][0] = "ndn:/1/2/3/4/5/6/7/8/9/10";
+ testJson["remove"][1] = "/a/b/c/d";
+ testJson["remove"][2] = "/test/for/remove";
+
+ std::stringstream ss;
+ std::string expectRes1 = "INSERT INTO cmip5 (sha256, name, activity, product, organization, \
+model, experiment, frequency, modeling_realm, variable_name, ensemble, time) VALUES(\
+'3738C9C0E0297DE7FE0EE538030597442DEEFF0F2C88778404D7B6E4BAD589F6','/1/2/3/4/5/6/7/8/9/10',\
+'1','2','3','4','5','6','7','8','9','10'),\
+('F93128EE9B7769105C6BDF6AA0FAA8CB4ED429395DDBC2CDDBFBA05F35B320FB','ndn:/a/b/c/d/eee/f/gg/h/iiii/j'\
+,'a','b','c','d','eee','f','gg','h','iiii','j');";
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testJson2Sql(ss, testJson, util::ADD), true);
+ BOOST_CHECK_EQUAL(ss.str(), expectRes1);
+
+ ss.str("");
+ ss.clear();
+ std::string expectRes2 = "delete from cmip5 where name in ('ndn:/1/2/3/4/5/6/7/8/9/10',\
+'/a/b/c/d','/test/for/remove');";
+ BOOST_CHECK_EQUAL(publishAdapterTest1.testJson2Sql(ss, testJson, util::REMOVE), true);
+ BOOST_CHECK_EQUAL(ss.str(), expectRes2);
+ }
+
+ BOOST_AUTO_TEST_CASE(PublishAdapterSqlStringFailureTest)
+ {
+ Json::Value testJson;
+ testJson["add"][0] = "/1/2/3/4/5/6/7/8/9/10";
+ testJson["add"][1] = "/a/b/c/d/eee/f/gg/h/iiii/j/kkk"; //too much components
+ std::stringstream ss;
+ bool res = publishAdapterTest1.testJson2Sql(ss, testJson, util::REMOVE);
+ BOOST_CHECK(res == false);
+ }
+
BOOST_AUTO_TEST_CASE(PublishAdapterValidateDataTestSuccess)
{
ndn::Name dataName("/test/publisher/12345"); // data name must be prefix+nonce
diff --git a/catalog/tests/unit-tests/query/test-query-adapter.cpp b/catalog/tests/unit-tests/query/test-query-adapter.cpp
index 3be4544..fdf5140 100644
--- a/catalog/tests/unit-tests/query/test-query-adapter.cpp
+++ b/catalog/tests/unit-tests/query/test-query-adapter.cpp
@@ -397,7 +397,8 @@
1, false, false, 2);
BOOST_CHECK_EQUAL(data->getName().toUri(), "/atmos/test/prefix/%00%01");
BOOST_CHECK_EQUAL(data->getFinalBlockId(), ndn::Name::Component(""));
- const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()));
+ const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()),
+ data->getContent().value_size());
Json::Value parsedFromString;
Json::Reader reader;
BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -418,7 +419,8 @@
BOOST_CHECK_EQUAL(data->getName().toUri(), "/atmos/test/prefix/%00%02");
BOOST_CHECK_EQUAL(data->getFinalBlockId(), ndn::Name::Component::fromSegment(2));
- const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()));
+ const std::string jsonRes(reinterpret_cast<const char*>(data->getContent().value()),
+ data->getContent().value_size());
Json::Value parsedFromString;
Json::Reader reader;
BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -456,7 +458,8 @@
BOOST_CHECK(replyData);
if (replyData){
BOOST_CHECK_EQUAL(replyData->getName().getPrefix(2), ndn::Name("/test/query-results"));
- const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()));
+ const std::string jsonRes(reinterpret_cast<const char*>(replyData->getContent().value()),
+ replyData->getContent().value_size());
Json::Value parsedFromString;
Json::Reader reader;
BOOST_CHECK_EQUAL(reader.parse(jsonRes, parsedFromString), true);
@@ -476,22 +479,23 @@
Json::Value testJson;
testJson["?"] = "/";
BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
- BOOST_CHECK_EQUAL("SELECT activity FROM cmip5;", ss.str());
+ BOOST_CHECK_EQUAL("SELECT DISTINCT activity FROM cmip5;", ss.str());
ss.str("");
ss.clear();
testJson.clear();
testJson["?"] = "/Activity/";
BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
- BOOST_CHECK_EQUAL("SELECT product FROM cmip5 WHERE activity='Activity';", ss.str());
+ BOOST_CHECK_EQUAL("SELECT DISTINCT product FROM cmip5 WHERE activity='Activity';", ss.str());
ss.str("");
ss.clear();
testJson.clear();
testJson["?"] = "/Activity/Product/Organization/Model/Experiment/";
BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
- BOOST_CHECK_EQUAL("SELECT frequency FROM cmip5 WHERE activity='Activity' AND experiment=\
-'Experiment' AND model='Model' AND organization='Organization' AND product='Product';", ss.str());
+ BOOST_CHECK_EQUAL("SELECT DISTINCT frequency FROM cmip5 WHERE activity='Activity' AND \
+experiment='Experiment' AND model='Model' AND organization='Organization' AND product='Product';",
+ ss.str());
ss.str("");
ss.clear();
@@ -499,9 +503,10 @@
testJson["?"] = "/Activity/Product/Organization/Model/Experiment/Frequency/Modeling/\
Variable/Ensemble/";
BOOST_CHECK_EQUAL(true, queryAdapterTest2.json2AutocompletionSqlTest(ss, testJson));
- BOOST_CHECK_EQUAL("SELECT time FROM cmip5 WHERE activity='Activity' AND ensemble='Ensemble' AND\
- experiment='Experiment' AND frequency='Frequency' AND model='Model' AND modeling_realm='Modeling' \
-AND organization='Organization' AND product='Product' AND variable_name='Variable';",ss.str());
+ BOOST_CHECK_EQUAL("SELECT DISTINCT time FROM cmip5 WHERE activity='Activity' AND ensemble=\
+'Ensemble' AND experiment='Experiment' AND frequency='Frequency' AND model='Model' AND \
+modeling_realm='Modeling' AND organization='Organization' AND product='Product' AND variable_name=\
+'Variable';",ss.str());
}
BOOST_AUTO_TEST_CASE(QueryAdapterAutocompletionSqlFailTest)