blob: c331bf7bef69203a50eb4dd18476e138a1bde5ef [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060036#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060037
Chengyu Fan46398212015-08-11 11:23:13 -060038#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060039#include <memory>
40#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060041#include <vector>
42#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060043#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060044
45namespace atmos {
46namespace publish {
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060047
Chengyu Fan46398212015-08-11 11:23:13 -060048#define RETRY_WHEN_TIMEOUT 2
49// TODO: need to use the configured nameFields
50std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
51 "organization", "model", "experiment",
52 "frequency", "modeling_realm",
53 "variable_name", "ensemble", "time"}};
54
Alison Craig2a4d5282015-04-10 12:00:02 -060055/**
56 * PublishAdapter handles the Publish usecases for the catalog
57 */
58template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060059class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060060public:
61 /**
62 * Constructor
63 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060064 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060066 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060067 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
68 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060069
Alison Craig2a4d5282015-04-10 12:00:02 -060070 virtual
71 ~PublishAdapter();
72
Chengyu Fanb25835b2015-04-28 17:09:35 -060073 /**
74 * Helper function that subscribe to a publish section for the config file
75 */
76 void
77 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060078 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060079 const std::vector<std::string>& nameFields,
80 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060081
Alison Craig2a4d5282015-04-10 12:00:02 -060082protected:
83 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060084 * Helper function that configures piblishAdapter instance according to publish section
85 * in config file
86 */
87 void
88 onConfig(const util::ConfigSection& section,
89 bool isDryDun,
90 const std::string& fileName,
91 const ndn::Name& prefix);
92
93 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060094 * Initial "please publish this" Interests
95 *
96 * @param filter: InterestFilter that caused this Interest to be routed
97 * @param interest: Interest that needs to be handled
98 */
99 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600100 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600101
Chengyu Fan46398212015-08-11 11:23:13 -0600102 virtual void
103 onTimeout(const ndn::Interest& interest);
104
Alison Craig2a4d5282015-04-10 12:00:02 -0600105 /**
106 * Data containing the actual thing we need to publish
107 *
108 * @param interest: Interest that caused this Data to be routed
109 * @param data: Data that needs to be handled
110 */
111 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600112 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600113
Chengyu Fanb25835b2015-04-28 17:09:35 -0600114 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600115 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600116 */
117 void
Chengyu Fan46398212015-08-11 11:23:13 -0600118 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600119
Chengyu Fanb25835b2015-04-28 17:09:35 -0600120 /**
121 * Helper function that sets filters to make the adapter work
122 */
123 void
124 setFilters();
125
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600126 void
127 setCatalogId();
128
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600129 /**
130 * Function to validate publication changes against the trust model, which is, all file
131 * names must be under the publisher's prefix. This function should be called by a callback
132 * function invoked by validator
133 *
134 * @param data: received data from the publisher
135 */
136 bool
137 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
138
Chengyu Fan46398212015-08-11 11:23:13 -0600139
140 /**
141 * Helper function that processes the sync update
142 *
143 * @param updates: vector that contains all the missing data information
144 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600145 void
146 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
Chengyu Fan46398212015-08-11 11:23:13 -0600147
148 /**
149 * Helper function that processes the update data
150 *
151 * @param data: shared pointer for the fetched update data
152 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600153 void
154 processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600155
156 /**
157 * Helper function that add data to or remove data from database
158 *
159 * @param sql: sql string to do the add or remove jobs
160 * @param op: enum value indicates the database operation, could be REMOVE, ADD
161 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600162 virtual void
163 operateDatabase(const std::string& sql,
164 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600165
166 /**
167 * Helper function that parses jsonValue to generate sql string, return value indicates
168 * if it is successfully
169 *
170 * @param sqlString: streamstream to save the sql string
171 * @param jsonValue: Json value that contains the update information
172 * @param op: enum value indicates the database operation, could be REMOVE, ADD
173 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600174 bool
175 json2Sql(std::stringstream& sqlString,
176 Json::Value& jsonValue,
177 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600178
179 /**
180 * Helper function to generate sql string based on file name, return value indicates
181 * if it is successfully
182 *
183 * @param sqlString: streamstream to save the sql string
184 * @param fileName: ndn uri string for a file name
185 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600186 bool
187 name2Fields(std::stringstream& sqlstring,
188 std::string& fileName);
Chengyu Fan46398212015-08-11 11:23:13 -0600189
190 /**
191 * Check the local database for the latest sequence number for a ChronoSync update
192 *
193 * @param update: the MissingDataInfo object
194 */
195 chronosync::SeqNo
196 getLatestSeqNo(const chronosync::MissingDataInfo& update);
197
198 /**
199 * Update the local database with the update message
200 *
201 * @param update: the MissingDataInfo object
202 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600203 void
204 renewUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600205
206 /**
207 * Insert the update message into the local database
208 *
209 * @param update: the MissingDataInfo object
210 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600211 void
212 addUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600213
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600214 void
215 onFetchUpdateDataTimeout(const ndn::Interest& interest);
Chengyu Fan46398212015-08-11 11:23:13 -0600216
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600217 void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600218 onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
219 const std::string& failureInfo);
220
221 void
222 validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600223
Chengyu Fanb25835b2015-04-28 17:09:35 -0600224protected:
225 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
226 // Prefix for ChronoSync
227 ndn::Name m_syncPrefix;
228 // Handle to the Catalog's database
229 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600230 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600231 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan46398212015-08-11 11:23:13 -0600232 std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
233 // mutex to control critical sections
234 std::mutex m_mutex;
235 std::vector<std::string> m_tableColumns;
236 // TODO: create thread for each request, and the variables below should be within the thread
237 bool m_mustBeFresh;
238 bool m_isFinished;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600239 ndn::Name m_catalogId;
Alison Craig2a4d5282015-04-10 12:00:02 -0600240};
241
Alison Craig2a4d5282015-04-10 12:00:02 -0600242
Chengyu Fanb25835b2015-04-28 17:09:35 -0600243template <typename DatabaseHandler>
244PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
245 const std::shared_ptr<ndn::KeyChain>& keyChain)
246 : util::CatalogAdapter(face, keyChain)
Chengyu Fan46398212015-08-11 11:23:13 -0600247 , m_mustBeFresh(true)
248 , m_isFinished(false)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600249 , m_catalogId("catalogIdPlaceHolder")
Chengyu Fanb25835b2015-04-28 17:09:35 -0600250{
251}
252
253template <typename DatabaseHandler>
254void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600255PublishAdapter<DatabaseHandler>::setCatalogId()
256{
257 // empty
258}
259
260template <>
261void
262PublishAdapter<MYSQL>::setCatalogId()
263{
264 // use public key digest as the catalog ID
265 ndn::Name keyId;
266 if (m_signingId.empty()) {
267 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
268 } else {
269 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
270 }
271
272 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
273 ndn::Block keyDigest = pKey->computeDigest();
274 m_catalogId.clear();
275 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
276}
277
278template <typename DatabaseHandler>
279void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600280PublishAdapter<DatabaseHandler>::setFilters()
281{
282 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600283 m_registeredPrefixList[publishPrefix] =
284 m_face->setInterestFilter(publishPrefix,
285 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
286 this, _1, _2),
287 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
288 this, _1),
289 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
290 this, _1, _2));
291
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600292 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
Chengyu Fan46398212015-08-11 11:23:13 -0600293 m_socket.reset(new chronosync::Socket(m_syncPrefix,
294 catalogSync,
295 *m_face,
296 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
297 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600298}
299
300template <typename DatabaseHandler>
301PublishAdapter<DatabaseHandler>::~PublishAdapter()
302{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600303 for (const auto& itr : m_registeredPrefixList) {
304 if (static_cast<bool>(itr.second))
305 m_face->unsetInterestFilter(itr.second);
306 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600307}
308
309template <typename DatabaseHandler>
310void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600311PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600312 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600313 const std::vector<std::string>& nameFields,
314 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600315{
Chengyu Fan92440162015-07-09 14:43:31 -0600316 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600317 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600318 config.addSectionHandler("publishAdapter",
319 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
320 _1, _2, _3, prefix));
321}
322
323template <typename DatabaseHandler>
324void
325PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
326 bool isDryRun,
327 const std::string& filename,
328 const ndn::Name& prefix)
329{
330 using namespace util;
331 if (isDryRun) {
332 return;
333 }
334
335 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
336 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
337
338 for (auto item = section.begin();
339 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600340 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600341 {
342 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600343 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600344 if (signingId.empty()) {
345 throw Error("Invalid value for \"signingId\""
346 " in \"publish\" section");
347 }
348 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600349 else if (item->first == "security") {
350 // when use, the validator must specify the callback func to handle the validated data
351 // it should be called when the Data packet that contains the published file names is received
352 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
353 m_publishValidator->load(item->second, filename);
354 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600355 else if (item->first == "database") {
356 const util::ConfigSection& databaseSection = item->second;
357 for (auto subItem = databaseSection.begin();
358 subItem != databaseSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600359 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600360 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600361 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600362 if (dbServer.empty()){
363 throw Error("Invalid value for \"dbServer\""
364 " in \"publish\" section");
365 }
366 }
367 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600368 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600369 if (dbName.empty()){
370 throw Error("Invalid value for \"dbName\""
371 " in \"publish\" section");
372 }
373 }
374 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600375 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600376 if (dbUser.empty()){
377 throw Error("Invalid value for \"dbUser\""
378 " in \"publish\" section");
379 }
380 }
381 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600382 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600383 if (dbPasswd.empty()){
384 throw Error("Invalid value for \"dbPasswd\""
385 " in \"publish\" section");
386 }
387 }
388 }
389 }
390 else if (item->first == "sync") {
391 const util::ConfigSection& synSection = item->second;
392 for (auto subItem = synSection.begin();
393 subItem != synSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600394 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600395 if (subItem->first == "prefix") {
396 syncPrefix.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600397 syncPrefix = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600398 if (syncPrefix.empty()){
399 throw Error("Invalid value for \"prefix\""
400 " in \"publish\\sync\" section");
401 }
402 }
403 // todo: parse the sync_security section
404 }
405 }
406 }
407
408 m_prefix = prefix;
409 m_signingId = ndn::Name(signingId);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600410 setCatalogId();
411
Chengyu Fanb25835b2015-04-28 17:09:35 -0600412 m_syncPrefix.clear();
413 m_syncPrefix.append(syncPrefix);
414 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
415
Chengyu Fan46398212015-08-11 11:23:13 -0600416 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600417 setFilters();
418}
419
420template <typename DatabaseHandler>
421void
Chengyu Fan46398212015-08-11 11:23:13 -0600422PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600423{
424 //empty
425}
426
427template <>
428void
Chengyu Fan46398212015-08-11 11:23:13 -0600429PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600430{
431 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
432
433 m_databaseHandler = conn;
Chengyu Fan46398212015-08-11 11:23:13 -0600434
435 if (m_databaseHandler != nullptr) {
436 std::string errMsg;
437 bool success = false;
438 // Ignore errors (when database already exists, errors are expected)
439 std::string createSyncTable =
440 "CREATE TABLE `chronosync_update_info` (\
441 `id` int(11) NOT NULL AUTO_INCREMENT, \
442 `session_name` varchar(1000) NOT NULL, \
443 `seq_num` int(11) NOT NULL, \
444 PRIMARY KEY (`id`), \
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600445 UNIQUE KEY `id_UNIQUE` (`id`) \
Chengyu Fan46398212015-08-11 11:23:13 -0600446 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
447
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600448 MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
449 success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600450 if (!success)
451 std::cout << errMsg << std::endl;
452
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600453 // create SQL string for table creation, id, sha256, and name are columns that we need
454 std::stringstream ss;
455 ss << "CREATE TABLE `" << m_databaseTable << "` (\
456 `id` int(100) NOT NULL AUTO_INCREMENT, \
457 `sha256` varchar(64) NOT NULL, \
458 `name` varchar(1000) NOT NULL,";
459 for (size_t i = 0; i < m_nameFields.size(); i++) {
460 ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
461 }
462 ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
Chengyu Fan46398212015-08-11 11:23:13 -0600463 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
464
465 success = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600466 MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600467 if (!success)
468 std::cout << errMsg << std::endl;
469 }
470 else {
471 throw Error("cannot connect to the Database");
472 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600473}
474
475template <typename DatabaseHandler>
476void
477PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
478 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600479{
Chengyu Fan46398212015-08-11 11:23:13 -0600480 // Example Interest : /cmip5/publish/<uri>/<nonce>
481 std::cout << "Publish interest : " << interest.getName().toUri() << std::endl;
482
483 //send back ACK
484 char buf[4] = "ACK";
485 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
486 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
487 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
488 m_keyChain->sign(*data);
489 m_face->put(*data);
490 std::cout << "Sent ACK for " << interest.getName() << std::endl;
491
492
493 //TODO: if already in catalog, what do we do?
494 //ask for content
495 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
496 size_t m_nextSegment = 0;
497 std::shared_ptr<ndn::Interest> retrieveInterest =
498 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
499 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
500 retrieveInterest->setMustBeFresh(m_mustBeFresh);
501 m_face->expressInterest(*retrieveInterest,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600502 bind(&PublishAdapter<DatabaseHandler>::onPublishedData,
Chengyu Fan46398212015-08-11 11:23:13 -0600503 this,_1, _2),
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600504 bind(&PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
Chengyu Fan46398212015-08-11 11:23:13 -0600505 std::cout << "Expressing Interest for: " << retrieveInterest->toUri() << std::endl;
506}
507
508template <typename DatabaseHandler>
509void
510PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
511{
512 std::cout << "interest " << interest.getName() << " timed out";
Alison Craig2a4d5282015-04-10 12:00:02 -0600513}
514
515template <typename DatabaseHandler>
516void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600517PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
518 const std::string& failureInfo)
519{
520 std::cout << "Validation failed: " << data->getName() << failureInfo << std::endl;
521}
522
523template <typename DatabaseHandler>
524void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600525PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
526 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600527{
Chengyu Fan46398212015-08-11 11:23:13 -0600528 std::cout << "received Data " << data.getName() << std::endl;
529 if (data.getContent().empty()) {
530 return;
531 }
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600532 m_publishValidator->validate(data,
533 bind(&PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod, this, _1),
534 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed, this, _1, _2));
535}
Chengyu Fan46398212015-08-11 11:23:13 -0600536
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600537template <typename DatabaseHandler>
538void
539PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod(const ndn::shared_ptr<const ndn::Data>& data)
540{
Chengyu Fan46398212015-08-11 11:23:13 -0600541 // validate published data payload, if failed, return
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600542 if (!validatePublicationChanges(data)) {
543 std::cout << "data validation failed : " << data->getName() << std::endl;
Chengyu Fan46398212015-08-11 11:23:13 -0600544#ifndef NDEBUG
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600545 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
546 data->getContent().value_size());
Chengyu Fan46398212015-08-11 11:23:13 -0600547 std::cout << payload << std::endl;
548#endif
549 return;
550 }
551
552 // todo: return value to indicate if the insertion succeeds
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600553 processUpdateData(data);
Chengyu Fan46398212015-08-11 11:23:13 -0600554
555 // ideally, data should not be stale?
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600556 m_socket->publishData(data->getContent(), ndn::time::seconds(3600));
Chengyu Fan46398212015-08-11 11:23:13 -0600557
558 // if this is not the final block, continue to fetch the next one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600559 const ndn::name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
560 if (finalBlockId == data->getName()[-1]) {
Chengyu Fan46398212015-08-11 11:23:13 -0600561 m_isFinished = true;
562 }
563 //else, get the next segment
564 if (!m_isFinished) {
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600565 ndn::Name nextInterestName = data->getName().getPrefix(-1);
566 uint64_t incomingSegment = data->getName()[-1].toSegment();
567 incomingSegment++;
568 std::cout << " Next Interest Name " << nextInterestName << " Segment " << incomingSegment
Chengyu Fan46398212015-08-11 11:23:13 -0600569 << std::endl;
570 std::shared_ptr<ndn::Interest> nextInterest =
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600571 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment));
Chengyu Fan46398212015-08-11 11:23:13 -0600572 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
573 nextInterest->setMustBeFresh(m_mustBeFresh);
574 m_face->expressInterest(*nextInterest,
575 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
576 this,_1, _2),
577 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
578 this, _1));
579 }
580}
581
582template <typename DatabaseHandler>
583void
584PublishAdapter<DatabaseHandler>::processUpdateData(const ndn::shared_ptr<const ndn::Data>& data)
585{
586 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
587 data->getContent().value_size());
588
589 if (payload.length() <= 0) {
590 return;
591 }
592
593 // the data payload must be JSON format
594 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
595 Json::Value parsedFromPayload;
596 Json::Reader jsonReader;
597 if (!jsonReader.parse(payload, parsedFromPayload)) {
598 // todo: logging events
599 std::cout << "fail to parse the update data" << std::endl;
600 return;
601 }
602 else {
603 std::cout << "received Json format payload : "
604 << parsedFromPayload.toStyledString() << std::endl;
605 }
606 std::stringstream ss;
607 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
608 std::cout << "sql string to insert data : " << ss.str() << std::endl;
609 // todo: before use, check if the connection is not NULL
610 // we may need to use lock here to ensure thread safe
611 operateDatabase(ss.str(), util::ADD);
612 }
613
614 ss.str("");
615 ss.clear();
616 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
617 std::cout << "sql string to remove data: " << ss.str() << std::endl;
618 operateDatabase(ss.str(), util::REMOVE);
619 }
620}
621
622template <typename DatabaseHandler>
623chronosync::SeqNo
624PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
625{
626 // empty
627 return 0;
628}
629
630template <>
631chronosync::SeqNo
632PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
633{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600634 std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
Chengyu Fan46398212015-08-11 11:23:13 -0600635 + update.session.toUri() + "';";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600636#ifndef NDEBUG
Chengyu Fan46398212015-08-11 11:23:13 -0600637 std::cout << "get latest seqNo : " << sql << std::endl;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600638#endif
Chengyu Fan46398212015-08-11 11:23:13 -0600639 std::string errMsg;
640 bool success;
641 std::shared_ptr<MYSQL_RES> results
642 = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
643 if (!success) {
644 std::cout << errMsg << std::endl;
645 return 0; //database connection error?
646 }
647 else if (results != nullptr){
648 MYSQL_ROW row;
649 if (mysql_num_rows(results.get()) == 0)
650 return 0;
651
652 while ((row = mysql_fetch_row(results.get())))
653 {
654 chronosync::SeqNo seqNo = std::stoull(row[0]);
655 return seqNo;
656 }
657 }
658 return 0;
659}
660
661template <typename DatabaseHandler>
662void
663PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
664{
665 //empty
666}
667
668template <>
669void
670PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
671{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600672 std::string sql = "UPDATE chronosync_update_info SET seq_num = "
Chengyu Fan46398212015-08-11 11:23:13 -0600673 + boost::lexical_cast<std::string>(update.high)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600674 + " WHERE session_name = '" + update.session.toUri() + "';";
Chengyu Fan46398212015-08-11 11:23:13 -0600675 std::cout << "renew update Info : " << sql << std::endl;
676 std::string errMsg;
677 bool success = false;
678 m_mutex.lock();
679 util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
680 m_mutex.unlock();
681 if (!success)
682 std::cout << errMsg << std::endl;
683}
684
685template <typename DatabaseHandler>
686void
687PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
688{
689 //empty
690}
691
692template <>
693void
694PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
695{
696 std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
697 + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
698 + ");";
699
700 std::cout << "add update Info : " << sql << std::endl;
701 std::string errMsg;
702 bool success = false;
703 m_mutex.lock();
704 util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
705 m_mutex.unlock();
706 if (!success)
707 std::cout << errMsg << std::endl;
708}
709
710template <typename DatabaseHandler>
711void
712PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
713{
714 // todo: record event, and use recovery Interest to fetch the whole table
715}
716
717template <typename DatabaseHandler>
718void
Chengyu Fan46398212015-08-11 11:23:13 -0600719PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
720 updates)
721{
722 if (updates.empty()) {
723 return;
724 }
725
726 // multiple updates from different catalog are possible
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600727 for (size_t i = 0; i < updates.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600728 // check if the session is in local DB
729 // if yes, only fetch packets whose seq number is bigger than the one in the DB
730 // if no, directly fetch Data
731 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
732 bool update = false;
733
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600734 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
Chengyu Fan46398212015-08-11 11:23:13 -0600735 if (seq > localSeqNo) {
736 m_socket->fetchData(updates[i].session, seq,
737 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600738 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed,
Chengyu Fan46398212015-08-11 11:23:13 -0600739 this, _1, _2),
740 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
741 this, _1),
742 RETRY_WHEN_TIMEOUT);
743 std::cout << "send Interest for [" << updates[i].session << ":" << seq << "]" << std::endl;
744 update = true;
745 }
746 }
747 // update the seq session name and seq number in local DB
748 // indicating they are processed. So latter when this node reboots again, won't redo it
749 if (update) {
750 if (localSeqNo > 0)
751 renewUpdateInformation(updates[i]);
752 else
753 addUpdateInformation(updates[i]);
754 }
755 }
756}
757
758template <typename DatabaseHandler>
759void
760PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
761{
762 // empty
763}
764
765template <>
766void
767PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
768{
769 std::string errMsg;
770 bool success = false;
771 m_mutex.lock();
772 atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
773 m_mutex.unlock();
774 if (!success)
775 std::cout << errMsg << std::endl;
776}
777
778template<typename DatabaseHandler>
779bool
780PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
781 Json::Value& jsonValue,
782 util::DatabaseOperation op)
783{
784 if (jsonValue.type() != Json::objectValue) {
785 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
786 return false;
787 }
788 if (op == util::ADD) {
789 size_t updateNumber = jsonValue["add"].size();
790 if (updateNumber <= 0)
791 return false;
792
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600793 sqlString << "INSERT INTO " << m_databaseTable << " (";
794 for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600795 if (i != 0)
796 sqlString << ", ";
797 sqlString << atmosTableColumns[i];
798 }
799 sqlString << ") VALUES";
800
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600801 for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
Chengyu Fan46398212015-08-11 11:23:13 -0600802 if (i > 0)
803 sqlString << ",";
804 // cast might be overflowed
805 Json::Value item = jsonValue["add"][static_cast<int>(i)];
806 if (!item.isConvertibleTo(Json::stringValue)) {
807 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
808 return false;
809 }
810 std::string fileName(item.asString());
811 // use digest sha256 for now, may be removed
812 ndn::util::Digest<CryptoPP::SHA256> digest;
813 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
814
815 sqlString << "('" << digest.toString() << "','" << fileName << "'";
816
817 // parse the ndn name to get each value for each field
818 if (!name2Fields(sqlString, fileName))
819 return false;
820 sqlString << ")";
821 }
822 sqlString << ";";
823 }
824 else if (op == util::REMOVE) {
825 // remove files from db
826 size_t updateNumber = jsonValue["remove"].size();
827 if (updateNumber <= 0)
828 return false;
829
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600830 sqlString << "delete from " << m_databaseTable << " where name in (";
831 for (size_t i = 0; i < updateNumber; ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600832 if (i > 0)
833 sqlString << ",";
834 // cast might be overflowed
835 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
836 if (!item.isConvertibleTo(Json::stringValue)) {
837 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
838 return false;
839 }
840 std::string fileName(item.asString());
841
842 sqlString << "'" << fileName << "'";
843 }
844 sqlString << ");";
845 }
846 return true;
847}
848
849template<typename DatabaseHandler>
850bool
851PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
852 std::string& fileName)
853{
854 size_t start = 0;
855 size_t pos = 0;
856 size_t count = 0;
857 std::string token;
858 std::string delimiter = "/";
859 // fileName must starts with either ndn:/ or /
860 std::string nameWithNdn("ndn:/");
861 std::string nameWithSlash("/");
862 if (fileName.find(nameWithNdn) == 0) {
863 start = nameWithNdn.size();
864 }
865 else if (fileName.find(nameWithSlash) == 0) {
866 start = nameWithSlash.size();
867 }
868 else
869 return false;
870
871 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600872 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600873 token = fileName.substr(start, pos - start);
874 if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
875 return false; //fileName contains more than 10 fields
876 }
877 sqlString << ",'" << token << "'";
878 start = pos + 1;
879 }
880
881 // must be 10 fields in total (add the tail one)
882 if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
883 return false;
884 token = fileName.substr(start, std::string::npos - start);
885 sqlString << ",'" << token << "'";
886 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600887}
888
889template<typename DatabaseHandler>
890bool
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600891PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
892 std::shared_ptr<const ndn::Data>& data)
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600893{
894 // The data name must be "/<publisher-prefix>/<nonce>"
895 // the prefix is the data name removes the last component
896 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
897
898 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
899 data->getContent().value_size());
900 Json::Value parsedFromString;
901 Json::Reader reader;
902 if (!reader.parse(payload, parsedFromString)) {
903 // parse error, log events
904 std::cout << "Cannot parse the published data " << data->getName() << " into Json" << std::endl;
905 return false;
906 }
907
908 // validate added files...
909 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
910 if (!publisherPrefix.isPrefixOf(
911 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
912 return false;
913 }
914
915 // validate removed files ...
916 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
917 if (!publisherPrefix.isPrefixOf(
918 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
919 return false;
920 }
921 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -0600922}
923
924} // namespace publish
925} // namespace atmos
926#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP