blob: 1d6f91f52464e253084227f5c253a37a17cdfb73 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060036#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060037
Chengyu Fan46398212015-08-11 11:23:13 -060038#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060039#include <memory>
40#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060041#include <vector>
42#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060043#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060044
45namespace atmos {
46namespace publish {
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060047
Chengyu Fan46398212015-08-11 11:23:13 -060048#define RETRY_WHEN_TIMEOUT 2
49// TODO: need to use the configured nameFields
50std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
51 "organization", "model", "experiment",
52 "frequency", "modeling_realm",
53 "variable_name", "ensemble", "time"}};
54
Alison Craig2a4d5282015-04-10 12:00:02 -060055/**
56 * PublishAdapter handles the Publish usecases for the catalog
57 */
58template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060059class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060060public:
61 /**
62 * Constructor
63 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060064 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060066 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060067 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
68 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060069
Alison Craig2a4d5282015-04-10 12:00:02 -060070 virtual
71 ~PublishAdapter();
72
Chengyu Fanb25835b2015-04-28 17:09:35 -060073 /**
74 * Helper function that subscribe to a publish section for the config file
75 */
76 void
77 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060078 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060079 const std::vector<std::string>& nameFields,
80 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060081
Alison Craig2a4d5282015-04-10 12:00:02 -060082protected:
83 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060084 * Helper function that configures piblishAdapter instance according to publish section
85 * in config file
86 */
87 void
88 onConfig(const util::ConfigSection& section,
89 bool isDryDun,
90 const std::string& fileName,
91 const ndn::Name& prefix);
92
93 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060094 * Initial "please publish this" Interests
95 *
96 * @param filter: InterestFilter that caused this Interest to be routed
97 * @param interest: Interest that needs to be handled
98 */
99 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600100 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600101
Chengyu Fan46398212015-08-11 11:23:13 -0600102 virtual void
103 onTimeout(const ndn::Interest& interest);
104
Alison Craig2a4d5282015-04-10 12:00:02 -0600105 /**
106 * Data containing the actual thing we need to publish
107 *
108 * @param interest: Interest that caused this Data to be routed
109 * @param data: Data that needs to be handled
110 */
111 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600112 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600113
Chengyu Fanb25835b2015-04-28 17:09:35 -0600114 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600115 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600116 */
117 void
Chengyu Fan46398212015-08-11 11:23:13 -0600118 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600119
Chengyu Fanb25835b2015-04-28 17:09:35 -0600120 /**
121 * Helper function that sets filters to make the adapter work
122 */
123 void
124 setFilters();
125
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600126 void
127 setCatalogId();
128
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600129 /**
130 * Function to validate publication changes against the trust model, which is, all file
131 * names must be under the publisher's prefix. This function should be called by a callback
132 * function invoked by validator
133 *
134 * @param data: received data from the publisher
135 */
136 bool
137 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
138
Chengyu Fan46398212015-08-11 11:23:13 -0600139
140 /**
141 * Helper function that processes the sync update
142 *
143 * @param updates: vector that contains all the missing data information
144 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 void
146 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
Chengyu Fan46398212015-08-11 11:23:13 -0600147
148 /**
149 * Helper function that processes the update data
150 *
151 * @param data: shared pointer for the fetched update data
152 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600153 void
154 processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600155
156 /**
157 * Helper function that add data to or remove data from database
158 *
159 * @param sql: sql string to do the add or remove jobs
160 * @param op: enum value indicates the database operation, could be REMOVE, ADD
161 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600162 virtual void
163 operateDatabase(const std::string& sql,
164 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600165
166 /**
167 * Helper function that parses jsonValue to generate sql string, return value indicates
168 * if it is successfully
169 *
170 * @param sqlString: streamstream to save the sql string
171 * @param jsonValue: Json value that contains the update information
172 * @param op: enum value indicates the database operation, could be REMOVE, ADD
173 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600174 bool
175 json2Sql(std::stringstream& sqlString,
176 Json::Value& jsonValue,
177 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600178
179 /**
180 * Helper function to generate sql string based on file name, return value indicates
181 * if it is successfully
182 *
183 * @param sqlString: streamstream to save the sql string
184 * @param fileName: ndn uri string for a file name
185 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600186 bool
187 name2Fields(std::stringstream& sqlstring,
188 std::string& fileName);
Chengyu Fan46398212015-08-11 11:23:13 -0600189
190 /**
191 * Check the local database for the latest sequence number for a ChronoSync update
192 *
193 * @param update: the MissingDataInfo object
194 */
195 chronosync::SeqNo
196 getLatestSeqNo(const chronosync::MissingDataInfo& update);
197
198 /**
199 * Update the local database with the update message
200 *
201 * @param update: the MissingDataInfo object
202 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600203 void
204 renewUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600205
206 /**
207 * Insert the update message into the local database
208 *
209 * @param update: the MissingDataInfo object
210 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600211 void
212 addUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600213
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600214 void
215 onFetchUpdateDataTimeout(const ndn::Interest& interest);
Chengyu Fan46398212015-08-11 11:23:13 -0600216
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600217 void
218 onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
219 const std::string& failureInfo);
Chengyu Fan46398212015-08-11 11:23:13 -0600220
Chengyu Fanb25835b2015-04-28 17:09:35 -0600221protected:
222 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
223 // Prefix for ChronoSync
224 ndn::Name m_syncPrefix;
225 // Handle to the Catalog's database
226 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600227 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600228 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan46398212015-08-11 11:23:13 -0600229 std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
230 // mutex to control critical sections
231 std::mutex m_mutex;
232 std::vector<std::string> m_tableColumns;
233 // TODO: create thread for each request, and the variables below should be within the thread
234 bool m_mustBeFresh;
235 bool m_isFinished;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600236 ndn::Name m_catalogId;
Alison Craig2a4d5282015-04-10 12:00:02 -0600237};
238
Alison Craig2a4d5282015-04-10 12:00:02 -0600239
Chengyu Fanb25835b2015-04-28 17:09:35 -0600240template <typename DatabaseHandler>
241PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
242 const std::shared_ptr<ndn::KeyChain>& keyChain)
243 : util::CatalogAdapter(face, keyChain)
Chengyu Fan46398212015-08-11 11:23:13 -0600244 , m_mustBeFresh(true)
245 , m_isFinished(false)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600246 , m_catalogId("catalogIdPlaceHolder")
Chengyu Fanb25835b2015-04-28 17:09:35 -0600247{
248}
249
250template <typename DatabaseHandler>
251void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600252PublishAdapter<DatabaseHandler>::setCatalogId()
253{
254 // empty
255}
256
257template <>
258void
259PublishAdapter<MYSQL>::setCatalogId()
260{
261 // use public key digest as the catalog ID
262 ndn::Name keyId;
263 if (m_signingId.empty()) {
264 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
265 } else {
266 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
267 }
268
269 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
270 ndn::Block keyDigest = pKey->computeDigest();
271 m_catalogId.clear();
272 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
273}
274
275template <typename DatabaseHandler>
276void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600277PublishAdapter<DatabaseHandler>::setFilters()
278{
279 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600280 m_registeredPrefixList[publishPrefix] =
281 m_face->setInterestFilter(publishPrefix,
282 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
283 this, _1, _2),
284 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
285 this, _1),
286 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
287 this, _1, _2));
288
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600289 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
Chengyu Fan46398212015-08-11 11:23:13 -0600290 m_socket.reset(new chronosync::Socket(m_syncPrefix,
291 catalogSync,
292 *m_face,
293 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
294 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600295}
296
297template <typename DatabaseHandler>
298PublishAdapter<DatabaseHandler>::~PublishAdapter()
299{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600300 for (const auto& itr : m_registeredPrefixList) {
301 if (static_cast<bool>(itr.second))
302 m_face->unsetInterestFilter(itr.second);
303 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600304}
305
306template <typename DatabaseHandler>
307void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600308PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600309 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600310 const std::vector<std::string>& nameFields,
311 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600312{
Chengyu Fan92440162015-07-09 14:43:31 -0600313 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600314 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600315 config.addSectionHandler("publishAdapter",
316 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
317 _1, _2, _3, prefix));
318}
319
320template <typename DatabaseHandler>
321void
322PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
323 bool isDryRun,
324 const std::string& filename,
325 const ndn::Name& prefix)
326{
327 using namespace util;
328 if (isDryRun) {
329 return;
330 }
331
332 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
333 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
334
335 for (auto item = section.begin();
336 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600337 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600338 {
339 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600340 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600341 if (signingId.empty()) {
342 throw Error("Invalid value for \"signingId\""
343 " in \"publish\" section");
344 }
345 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600346 else if (item->first == "security") {
347 // when use, the validator must specify the callback func to handle the validated data
348 // it should be called when the Data packet that contains the published file names is received
349 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
350 m_publishValidator->load(item->second, filename);
351 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600352 else if (item->first == "database") {
353 const util::ConfigSection& databaseSection = item->second;
354 for (auto subItem = databaseSection.begin();
355 subItem != databaseSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600356 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600357 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600358 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600359 if (dbServer.empty()){
360 throw Error("Invalid value for \"dbServer\""
361 " in \"publish\" section");
362 }
363 }
364 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600365 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600366 if (dbName.empty()){
367 throw Error("Invalid value for \"dbName\""
368 " in \"publish\" section");
369 }
370 }
371 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600372 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373 if (dbUser.empty()){
374 throw Error("Invalid value for \"dbUser\""
375 " in \"publish\" section");
376 }
377 }
378 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600379 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600380 if (dbPasswd.empty()){
381 throw Error("Invalid value for \"dbPasswd\""
382 " in \"publish\" section");
383 }
384 }
385 }
386 }
387 else if (item->first == "sync") {
388 const util::ConfigSection& synSection = item->second;
389 for (auto subItem = synSection.begin();
390 subItem != synSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600391 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600392 if (subItem->first == "prefix") {
393 syncPrefix.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600394 syncPrefix = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600395 if (syncPrefix.empty()){
396 throw Error("Invalid value for \"prefix\""
397 " in \"publish\\sync\" section");
398 }
399 }
400 // todo: parse the sync_security section
401 }
402 }
403 }
404
405 m_prefix = prefix;
406 m_signingId = ndn::Name(signingId);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600407 setCatalogId();
408
Chengyu Fanb25835b2015-04-28 17:09:35 -0600409 m_syncPrefix.clear();
410 m_syncPrefix.append(syncPrefix);
411 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
412
Chengyu Fan46398212015-08-11 11:23:13 -0600413 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600414 setFilters();
415}
416
417template <typename DatabaseHandler>
418void
Chengyu Fan46398212015-08-11 11:23:13 -0600419PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600420{
421 //empty
422}
423
424template <>
425void
Chengyu Fan46398212015-08-11 11:23:13 -0600426PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427{
428 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
429
430 m_databaseHandler = conn;
Chengyu Fan46398212015-08-11 11:23:13 -0600431
432 if (m_databaseHandler != nullptr) {
433 std::string errMsg;
434 bool success = false;
435 // Ignore errors (when database already exists, errors are expected)
436 std::string createSyncTable =
437 "CREATE TABLE `chronosync_update_info` (\
438 `id` int(11) NOT NULL AUTO_INCREMENT, \
439 `session_name` varchar(1000) NOT NULL, \
440 `seq_num` int(11) NOT NULL, \
441 PRIMARY KEY (`id`), \
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600442 UNIQUE KEY `id_UNIQUE` (`id`) \
Chengyu Fan46398212015-08-11 11:23:13 -0600443 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
444
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600445 MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
446 success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600447 if (!success)
448 std::cout << errMsg << std::endl;
449
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600450 // create SQL string for table creation, id, sha256, and name are columns that we need
451 std::stringstream ss;
452 ss << "CREATE TABLE `" << m_databaseTable << "` (\
453 `id` int(100) NOT NULL AUTO_INCREMENT, \
454 `sha256` varchar(64) NOT NULL, \
455 `name` varchar(1000) NOT NULL,";
456 for (size_t i = 0; i < m_nameFields.size(); i++) {
457 ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
458 }
459 ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
Chengyu Fan46398212015-08-11 11:23:13 -0600460 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
461
462 success = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600463 MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600464 if (!success)
465 std::cout << errMsg << std::endl;
466 }
467 else {
468 throw Error("cannot connect to the Database");
469 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600470}
471
472template <typename DatabaseHandler>
473void
474PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
475 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600476{
Chengyu Fan46398212015-08-11 11:23:13 -0600477 // Example Interest : /cmip5/publish/<uri>/<nonce>
478 std::cout << "Publish interest : " << interest.getName().toUri() << std::endl;
479
480 //send back ACK
481 char buf[4] = "ACK";
482 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
483 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
484 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
485 m_keyChain->sign(*data);
486 m_face->put(*data);
487 std::cout << "Sent ACK for " << interest.getName() << std::endl;
488
489
490 //TODO: if already in catalog, what do we do?
491 //ask for content
492 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
493 size_t m_nextSegment = 0;
494 std::shared_ptr<ndn::Interest> retrieveInterest =
495 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
496 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
497 retrieveInterest->setMustBeFresh(m_mustBeFresh);
498 m_face->expressInterest(*retrieveInterest,
499 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
500 this,_1, _2),
501 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
502 std::cout << "Expressing Interest for: " << retrieveInterest->toUri() << std::endl;
503}
504
505template <typename DatabaseHandler>
506void
507PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
508{
509 std::cout << "interest " << interest.getName() << " timed out";
Alison Craig2a4d5282015-04-10 12:00:02 -0600510}
511
512template <typename DatabaseHandler>
513void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600514PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
515 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600516{
Chengyu Fan46398212015-08-11 11:23:13 -0600517 std::cout << "received Data " << data.getName() << std::endl;
518 if (data.getContent().empty()) {
519 return;
520 }
521
522 std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
523 // validate published data payload, if failed, return
524 if (!validatePublicationChanges(dataPtr)) {
525 std::cout << "data validation failed : " << dataPtr->getName() << std::endl;
526#ifndef NDEBUG
527 const std::string payload(reinterpret_cast<const char*>(dataPtr->getContent().value()),
528 dataPtr->getContent().value_size());
529 std::cout << payload << std::endl;
530#endif
531 return;
532 }
533
534 // todo: return value to indicate if the insertion succeeds
535 processUpdateData(dataPtr);
536
537 // ideally, data should not be stale?
538 m_socket->publishData(data.getContent(), ndn::time::seconds(3600));
539
540 // if this is not the final block, continue to fetch the next one
541 const ndn::name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
542 if (finalBlockId == data.getName()[-1]) {
543 m_isFinished = true;
544 }
545 //else, get the next segment
546 if (!m_isFinished) {
547 ndn::Name nextInterestName = data.getName().getPrefix(-1);
548 uint64_t incomingSegment = data.getName()[-1].toSegment();
549 std::cout << " Next Interest Name " << nextInterestName << " Segment " << incomingSegment++
550 << std::endl;
551 std::shared_ptr<ndn::Interest> nextInterest =
552 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment++));
553 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
554 nextInterest->setMustBeFresh(m_mustBeFresh);
555 m_face->expressInterest(*nextInterest,
556 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
557 this,_1, _2),
558 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
559 this, _1));
560 }
561}
562
563template <typename DatabaseHandler>
564void
565PublishAdapter<DatabaseHandler>::processUpdateData(const ndn::shared_ptr<const ndn::Data>& data)
566{
567 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
568 data->getContent().value_size());
569
570 if (payload.length() <= 0) {
571 return;
572 }
573
574 // the data payload must be JSON format
575 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
576 Json::Value parsedFromPayload;
577 Json::Reader jsonReader;
578 if (!jsonReader.parse(payload, parsedFromPayload)) {
579 // todo: logging events
580 std::cout << "fail to parse the update data" << std::endl;
581 return;
582 }
583 else {
584 std::cout << "received Json format payload : "
585 << parsedFromPayload.toStyledString() << std::endl;
586 }
587 std::stringstream ss;
588 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
589 std::cout << "sql string to insert data : " << ss.str() << std::endl;
590 // todo: before use, check if the connection is not NULL
591 // we may need to use lock here to ensure thread safe
592 operateDatabase(ss.str(), util::ADD);
593 }
594
595 ss.str("");
596 ss.clear();
597 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
598 std::cout << "sql string to remove data: " << ss.str() << std::endl;
599 operateDatabase(ss.str(), util::REMOVE);
600 }
601}
602
603template <typename DatabaseHandler>
604chronosync::SeqNo
605PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
606{
607 // empty
608 return 0;
609}
610
611template <>
612chronosync::SeqNo
613PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
614{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600615 std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
Chengyu Fan46398212015-08-11 11:23:13 -0600616 + update.session.toUri() + "';";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600617#ifndef NDEBUG
Chengyu Fan46398212015-08-11 11:23:13 -0600618 std::cout << "get latest seqNo : " << sql << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600619#endif
Chengyu Fan46398212015-08-11 11:23:13 -0600620 std::string errMsg;
621 bool success;
622 std::shared_ptr<MYSQL_RES> results
623 = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
624 if (!success) {
625 std::cout << errMsg << std::endl;
626 return 0; //database connection error?
627 }
628 else if (results != nullptr){
629 MYSQL_ROW row;
630 if (mysql_num_rows(results.get()) == 0)
631 return 0;
632
633 while ((row = mysql_fetch_row(results.get())))
634 {
635 chronosync::SeqNo seqNo = std::stoull(row[0]);
636 return seqNo;
637 }
638 }
639 return 0;
640}
641
642template <typename DatabaseHandler>
643void
644PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
645{
646 //empty
647}
648
649template <>
650void
651PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
652{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600653 std::string sql = "UPDATE chronosync_update_info SET seq_num = "
Chengyu Fan46398212015-08-11 11:23:13 -0600654 + boost::lexical_cast<std::string>(update.high)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600655 + " WHERE session_name = '" + update.session.toUri() + "';";
Chengyu Fan46398212015-08-11 11:23:13 -0600656 std::cout << "renew update Info : " << sql << std::endl;
657 std::string errMsg;
658 bool success = false;
659 m_mutex.lock();
660 util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
661 m_mutex.unlock();
662 if (!success)
663 std::cout << errMsg << std::endl;
664}
665
666template <typename DatabaseHandler>
667void
668PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
669{
670 //empty
671}
672
673template <>
674void
675PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
676{
677 std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
678 + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
679 + ");";
680
681 std::cout << "add update Info : " << sql << std::endl;
682 std::string errMsg;
683 bool success = false;
684 m_mutex.lock();
685 util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
686 m_mutex.unlock();
687 if (!success)
688 std::cout << errMsg << std::endl;
689}
690
691template <typename DatabaseHandler>
692void
693PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
694{
695 // todo: record event, and use recovery Interest to fetch the whole table
696}
697
698template <typename DatabaseHandler>
699void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600700PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const
701 std::shared_ptr<const ndn::Data>& data,
Chengyu Fan46398212015-08-11 11:23:13 -0600702 const std::string& failureInfo)
703{
704 std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
705}
706
707template <typename DatabaseHandler>
708void
709PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
710 updates)
711{
712 if (updates.empty()) {
713 return;
714 }
715
716 // multiple updates from different catalog are possible
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600717 for (size_t i = 0; i < updates.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600718 // check if the session is in local DB
719 // if yes, only fetch packets whose seq number is bigger than the one in the DB
720 // if no, directly fetch Data
721 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
722 bool update = false;
723
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600724 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
Chengyu Fan46398212015-08-11 11:23:13 -0600725 if (seq > localSeqNo) {
726 m_socket->fetchData(updates[i].session, seq,
727 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
728 bind(&PublishAdapter<DatabaseHandler>::onUpdateValidationFailed,
729 this, _1, _2),
730 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
731 this, _1),
732 RETRY_WHEN_TIMEOUT);
733 std::cout << "send Interest for [" << updates[i].session << ":" << seq << "]" << std::endl;
734 update = true;
735 }
736 }
737 // update the seq session name and seq number in local DB
738 // indicating they are processed. So latter when this node reboots again, won't redo it
739 if (update) {
740 if (localSeqNo > 0)
741 renewUpdateInformation(updates[i]);
742 else
743 addUpdateInformation(updates[i]);
744 }
745 }
746}
747
748template <typename DatabaseHandler>
749void
750PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
751{
752 // empty
753}
754
755template <>
756void
757PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
758{
759 std::string errMsg;
760 bool success = false;
761 m_mutex.lock();
762 atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
763 m_mutex.unlock();
764 if (!success)
765 std::cout << errMsg << std::endl;
766}
767
768template<typename DatabaseHandler>
769bool
770PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
771 Json::Value& jsonValue,
772 util::DatabaseOperation op)
773{
774 if (jsonValue.type() != Json::objectValue) {
775 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
776 return false;
777 }
778 if (op == util::ADD) {
779 size_t updateNumber = jsonValue["add"].size();
780 if (updateNumber <= 0)
781 return false;
782
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600783 sqlString << "INSERT INTO " << m_databaseTable << " (";
784 for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600785 if (i != 0)
786 sqlString << ", ";
787 sqlString << atmosTableColumns[i];
788 }
789 sqlString << ") VALUES";
790
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600791 for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
Chengyu Fan46398212015-08-11 11:23:13 -0600792 if (i > 0)
793 sqlString << ",";
794 // cast might be overflowed
795 Json::Value item = jsonValue["add"][static_cast<int>(i)];
796 if (!item.isConvertibleTo(Json::stringValue)) {
797 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
798 return false;
799 }
800 std::string fileName(item.asString());
801 // use digest sha256 for now, may be removed
802 ndn::util::Digest<CryptoPP::SHA256> digest;
803 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
804
805 sqlString << "('" << digest.toString() << "','" << fileName << "'";
806
807 // parse the ndn name to get each value for each field
808 if (!name2Fields(sqlString, fileName))
809 return false;
810 sqlString << ")";
811 }
812 sqlString << ";";
813 }
814 else if (op == util::REMOVE) {
815 // remove files from db
816 size_t updateNumber = jsonValue["remove"].size();
817 if (updateNumber <= 0)
818 return false;
819
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600820 sqlString << "delete from " << m_databaseTable << " where name in (";
821 for (size_t i = 0; i < updateNumber; ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600822 if (i > 0)
823 sqlString << ",";
824 // cast might be overflowed
825 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
826 if (!item.isConvertibleTo(Json::stringValue)) {
827 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
828 return false;
829 }
830 std::string fileName(item.asString());
831
832 sqlString << "'" << fileName << "'";
833 }
834 sqlString << ");";
835 }
836 return true;
837}
838
839template<typename DatabaseHandler>
840bool
841PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
842 std::string& fileName)
843{
844 size_t start = 0;
845 size_t pos = 0;
846 size_t count = 0;
847 std::string token;
848 std::string delimiter = "/";
849 // fileName must starts with either ndn:/ or /
850 std::string nameWithNdn("ndn:/");
851 std::string nameWithSlash("/");
852 if (fileName.find(nameWithNdn) == 0) {
853 start = nameWithNdn.size();
854 }
855 else if (fileName.find(nameWithSlash) == 0) {
856 start = nameWithSlash.size();
857 }
858 else
859 return false;
860
861 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600862 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600863 token = fileName.substr(start, pos - start);
864 if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
865 return false; //fileName contains more than 10 fields
866 }
867 sqlString << ",'" << token << "'";
868 start = pos + 1;
869 }
870
871 // must be 10 fields in total (add the tail one)
872 if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
873 return false;
874 token = fileName.substr(start, std::string::npos - start);
875 sqlString << ",'" << token << "'";
876 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600877}
878
879template<typename DatabaseHandler>
880bool
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600881PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
882 std::shared_ptr<const ndn::Data>& data)
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600883{
884 // The data name must be "/<publisher-prefix>/<nonce>"
885 // the prefix is the data name removes the last component
886 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
887
888 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
889 data->getContent().value_size());
890 Json::Value parsedFromString;
891 Json::Reader reader;
892 if (!reader.parse(payload, parsedFromString)) {
893 // parse error, log events
894 std::cout << "Cannot parse the published data " << data->getName() << " into Json" << std::endl;
895 return false;
896 }
897
898 // validate added files...
899 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
900 if (!publisherPrefix.isPrefixOf(
901 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
902 return false;
903 }
904
905 // validate removed files ...
906 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
907 if (!publisherPrefix.isPrefixOf(
908 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
909 return false;
910 }
911 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -0600912}
913
914} // namespace publish
915} // namespace atmos
916#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP