blob: b954bc81a8e2ad37fd593947a5d6e0a09ca573a1 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060036
Chengyu Fan46398212015-08-11 11:23:13 -060037#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060038#include <memory>
39#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060040#include <vector>
41#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060042#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060043
44namespace atmos {
45namespace publish {
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060046
Chengyu Fan46398212015-08-11 11:23:13 -060047#define RETRY_WHEN_TIMEOUT 2
48// TODO: need to use the configured nameFields
49std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
50 "organization", "model", "experiment",
51 "frequency", "modeling_realm",
52 "variable_name", "ensemble", "time"}};
53
Alison Craig2a4d5282015-04-10 12:00:02 -060054/**
55 * PublishAdapter handles the Publish usecases for the catalog
56 */
57template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060058class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060059public:
60 /**
61 * Constructor
62 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060063 * @param face: Face that will be used for NDN communications
64 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060065 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060066 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
67 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060068
Alison Craig2a4d5282015-04-10 12:00:02 -060069 virtual
70 ~PublishAdapter();
71
Chengyu Fanb25835b2015-04-28 17:09:35 -060072 /**
73 * Helper function that subscribe to a publish section for the config file
74 */
75 void
76 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060077 const ndn::Name& prefix,
78 const std::vector<std::string>& nameFields);
Chengyu Fanb25835b2015-04-28 17:09:35 -060079
Alison Craig2a4d5282015-04-10 12:00:02 -060080protected:
81 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060082 * Helper function that configures piblishAdapter instance according to publish section
83 * in config file
84 */
85 void
86 onConfig(const util::ConfigSection& section,
87 bool isDryDun,
88 const std::string& fileName,
89 const ndn::Name& prefix);
90
91 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060092 * Initial "please publish this" Interests
93 *
94 * @param filter: InterestFilter that caused this Interest to be routed
95 * @param interest: Interest that needs to be handled
96 */
97 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -060098 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -060099
Chengyu Fan46398212015-08-11 11:23:13 -0600100 virtual void
101 onTimeout(const ndn::Interest& interest);
102
Alison Craig2a4d5282015-04-10 12:00:02 -0600103 /**
104 * Data containing the actual thing we need to publish
105 *
106 * @param interest: Interest that caused this Data to be routed
107 * @param data: Data that needs to be handled
108 */
109 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600110 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600111
Chengyu Fanb25835b2015-04-28 17:09:35 -0600112 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600113 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600114 */
115 void
Chengyu Fan46398212015-08-11 11:23:13 -0600116 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600117
Chengyu Fanb25835b2015-04-28 17:09:35 -0600118 /**
119 * Helper function that sets filters to make the adapter work
120 */
121 void
122 setFilters();
123
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600124 /**
125 * Function to validate publication changes against the trust model, which is, all file
126 * names must be under the publisher's prefix. This function should be called by a callback
127 * function invoked by validator
128 *
129 * @param data: received data from the publisher
130 */
131 bool
132 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
133
Chengyu Fan46398212015-08-11 11:23:13 -0600134
135 /**
136 * Helper function that processes the sync update
137 *
138 * @param updates: vector that contains all the missing data information
139 */
140 void
141 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
142
143 /**
144 * Helper function that processes the update data
145 *
146 * @param data: shared pointer for the fetched update data
147 */
148 void
149 processUpdateData(const ndn::shared_ptr<const ndn::Data>& data);
150
151 /**
152 * Helper function that add data to or remove data from database
153 *
154 * @param sql: sql string to do the add or remove jobs
155 * @param op: enum value indicates the database operation, could be REMOVE, ADD
156 */
157 virtual void
158 operateDatabase(const std::string& sql,
159 util::DatabaseOperation op);
160
161 /**
162 * Helper function that parses jsonValue to generate sql string, return value indicates
163 * if it is successfully
164 *
165 * @param sqlString: streamstream to save the sql string
166 * @param jsonValue: Json value that contains the update information
167 * @param op: enum value indicates the database operation, could be REMOVE, ADD
168 */
169 bool
170 json2Sql(std::stringstream& sqlString,
171 Json::Value& jsonValue,
172 util::DatabaseOperation op);
173
174 /**
175 * Helper function to generate sql string based on file name, return value indicates
176 * if it is successfully
177 *
178 * @param sqlString: streamstream to save the sql string
179 * @param fileName: ndn uri string for a file name
180 */
181 bool
182 name2Fields(std::stringstream& sqlstring,
183 std::string& fileName);
184
185 /**
186 * Check the local database for the latest sequence number for a ChronoSync update
187 *
188 * @param update: the MissingDataInfo object
189 */
190 chronosync::SeqNo
191 getLatestSeqNo(const chronosync::MissingDataInfo& update);
192
193 /**
194 * Update the local database with the update message
195 *
196 * @param update: the MissingDataInfo object
197 */
198 void
199 renewUpdateInformation(const chronosync::MissingDataInfo& update);
200
201 /**
202 * Insert the update message into the local database
203 *
204 * @param update: the MissingDataInfo object
205 */
206 void
207 addUpdateInformation(const chronosync::MissingDataInfo& update);
208
209 void
210 onFetchUpdateDataTimeout(const ndn::Interest& interest);
211
212 void
213 onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
214 const std::string& failureInfo);
215
Chengyu Fanb25835b2015-04-28 17:09:35 -0600216protected:
217 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
218 // Prefix for ChronoSync
219 ndn::Name m_syncPrefix;
220 // Handle to the Catalog's database
221 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600222 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600223 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan46398212015-08-11 11:23:13 -0600224 std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
225 // mutex to control critical sections
226 std::mutex m_mutex;
227 std::vector<std::string> m_tableColumns;
228 // TODO: create thread for each request, and the variables below should be within the thread
229 bool m_mustBeFresh;
230 bool m_isFinished;
Alison Craig2a4d5282015-04-10 12:00:02 -0600231};
232
Alison Craig2a4d5282015-04-10 12:00:02 -0600233
Chengyu Fanb25835b2015-04-28 17:09:35 -0600234template <typename DatabaseHandler>
235PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
236 const std::shared_ptr<ndn::KeyChain>& keyChain)
237 : util::CatalogAdapter(face, keyChain)
Chengyu Fan46398212015-08-11 11:23:13 -0600238 , m_mustBeFresh(true)
239 , m_isFinished(false)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600240{
241}
242
243template <typename DatabaseHandler>
244void
245PublishAdapter<DatabaseHandler>::setFilters()
246{
247 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600248 m_registeredPrefixList[publishPrefix] =
249 m_face->setInterestFilter(publishPrefix,
250 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
251 this, _1, _2),
252 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
253 this, _1),
254 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
255 this, _1, _2));
256
257 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync");
258 m_socket.reset(new chronosync::Socket(m_syncPrefix,
259 catalogSync,
260 *m_face,
261 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
262 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600263}
264
265template <typename DatabaseHandler>
266PublishAdapter<DatabaseHandler>::~PublishAdapter()
267{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600268 for (const auto& itr : m_registeredPrefixList) {
269 if (static_cast<bool>(itr.second))
270 m_face->unsetInterestFilter(itr.second);
271 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600272}
273
274template <typename DatabaseHandler>
275void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600276PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600277 const ndn::Name& prefix,
278 const std::vector<std::string>& nameFields)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600279{
Chengyu Fan92440162015-07-09 14:43:31 -0600280 m_nameFields = nameFields;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600281 config.addSectionHandler("publishAdapter",
282 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
283 _1, _2, _3, prefix));
284}
285
286template <typename DatabaseHandler>
287void
288PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
289 bool isDryRun,
290 const std::string& filename,
291 const ndn::Name& prefix)
292{
293 using namespace util;
294 if (isDryRun) {
295 return;
296 }
297
298 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
299 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
300
301 for (auto item = section.begin();
302 item != section.end();
303 ++ item)
304 {
305 if (item->first == "signingId") {
306 signingId.assign(item->second.get_value<std::string>());
307 if (signingId.empty()) {
308 throw Error("Invalid value for \"signingId\""
309 " in \"publish\" section");
310 }
311 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600312 else if (item->first == "security") {
313 // when use, the validator must specify the callback func to handle the validated data
314 // it should be called when the Data packet that contains the published file names is received
315 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
316 m_publishValidator->load(item->second, filename);
317 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600318 else if (item->first == "database") {
319 const util::ConfigSection& databaseSection = item->second;
320 for (auto subItem = databaseSection.begin();
321 subItem != databaseSection.end();
322 ++ subItem) {
323 if (subItem->first == "dbServer") {
324 dbServer.assign(subItem->second.get_value<std::string>());
325 if (dbServer.empty()){
326 throw Error("Invalid value for \"dbServer\""
327 " in \"publish\" section");
328 }
329 }
330 if (subItem->first == "dbName") {
331 dbName.assign(subItem->second.get_value<std::string>());
332 if (dbName.empty()){
333 throw Error("Invalid value for \"dbName\""
334 " in \"publish\" section");
335 }
336 }
337 if (subItem->first == "dbUser") {
338 dbUser.assign(subItem->second.get_value<std::string>());
339 if (dbUser.empty()){
340 throw Error("Invalid value for \"dbUser\""
341 " in \"publish\" section");
342 }
343 }
344 if (subItem->first == "dbPasswd") {
345 dbPasswd.assign(subItem->second.get_value<std::string>());
346 if (dbPasswd.empty()){
347 throw Error("Invalid value for \"dbPasswd\""
348 " in \"publish\" section");
349 }
350 }
351 }
352 }
353 else if (item->first == "sync") {
354 const util::ConfigSection& synSection = item->second;
355 for (auto subItem = synSection.begin();
356 subItem != synSection.end();
357 ++ subItem) {
358 if (subItem->first == "prefix") {
359 syncPrefix.clear();
360 syncPrefix.assign(subItem->second.get_value<std::string>());
361 if (syncPrefix.empty()){
362 throw Error("Invalid value for \"prefix\""
363 " in \"publish\\sync\" section");
364 }
365 }
366 // todo: parse the sync_security section
367 }
368 }
369 }
370
371 m_prefix = prefix;
372 m_signingId = ndn::Name(signingId);
373 m_syncPrefix.clear();
374 m_syncPrefix.append(syncPrefix);
375 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
376
Chengyu Fan46398212015-08-11 11:23:13 -0600377 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600378 setFilters();
379}
380
381template <typename DatabaseHandler>
382void
Chengyu Fan46398212015-08-11 11:23:13 -0600383PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600384{
385 //empty
386}
387
388template <>
389void
Chengyu Fan46398212015-08-11 11:23:13 -0600390PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600391{
392 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
393
394 m_databaseHandler = conn;
Chengyu Fan46398212015-08-11 11:23:13 -0600395
396 if (m_databaseHandler != nullptr) {
397 std::string errMsg;
398 bool success = false;
399 // Ignore errors (when database already exists, errors are expected)
400 std::string createSyncTable =
401 "CREATE TABLE `chronosync_update_info` (\
402 `id` int(11) NOT NULL AUTO_INCREMENT, \
403 `session_name` varchar(1000) NOT NULL, \
404 `seq_num` int(11) NOT NULL, \
405 PRIMARY KEY (`id`), \
406 UNIQUE KEY `id_UNIQUE` (`id`) \
407 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
408
409 MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE, success, errMsg);
410 if (!success)
411 std::cout << errMsg << std::endl;
412
413 std::string createCmip5Table =
414 "CREATE TABLE `cmip5` ( \
415 `id` int(100) NOT NULL AUTO_INCREMENT, \
416 `sha256` varchar(64) NOT NULL, \
417 `name` varchar(1000) NOT NULL, \
418 `activity` varchar(100) NOT NULL, \
419 `product` varchar(100) NOT NULL, \
420 `organization` varchar(100) NOT NULL, \
421 `model` varchar(100) NOT NULL, \
422 `experiment` varchar(100) NOT NULL, \
423 `frequency` varchar(100) NOT NULL, \
424 `modeling_realm` varchar(100) NOT NULL, \
425 `variable_name` varchar(100) NOT NULL, \
426 `ensemble` varchar(100) NOT NULL, \
427 `time` varchar(100) NOT NULL, \
428 PRIMARY KEY (`id`), \
429 UNIQUE KEY `sha256` (`sha256`) \
430 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
431
432 success = false;
433 MySQLPerformQuery(m_databaseHandler, createCmip5Table, util::CREATE, success, errMsg);
434 if (!success)
435 std::cout << errMsg << std::endl;
436 }
437 else {
438 throw Error("cannot connect to the Database");
439 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600440}
441
442template <typename DatabaseHandler>
443void
444PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
445 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600446{
Chengyu Fan46398212015-08-11 11:23:13 -0600447 // Example Interest : /cmip5/publish/<uri>/<nonce>
448 std::cout << "Publish interest : " << interest.getName().toUri() << std::endl;
449
450 //send back ACK
451 char buf[4] = "ACK";
452 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
453 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
454 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
455 m_keyChain->sign(*data);
456 m_face->put(*data);
457 std::cout << "Sent ACK for " << interest.getName() << std::endl;
458
459
460 //TODO: if already in catalog, what do we do?
461 //ask for content
462 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
463 size_t m_nextSegment = 0;
464 std::shared_ptr<ndn::Interest> retrieveInterest =
465 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
466 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
467 retrieveInterest->setMustBeFresh(m_mustBeFresh);
468 m_face->expressInterest(*retrieveInterest,
469 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
470 this,_1, _2),
471 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
472 std::cout << "Expressing Interest for: " << retrieveInterest->toUri() << std::endl;
473}
474
475template <typename DatabaseHandler>
476void
477PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
478{
479 std::cout << "interest " << interest.getName() << " timed out";
Alison Craig2a4d5282015-04-10 12:00:02 -0600480}
481
482template <typename DatabaseHandler>
483void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600484PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
485 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600486{
Chengyu Fan46398212015-08-11 11:23:13 -0600487 std::cout << "received Data " << data.getName() << std::endl;
488 if (data.getContent().empty()) {
489 return;
490 }
491
492 std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
493 // validate published data payload, if failed, return
494 if (!validatePublicationChanges(dataPtr)) {
495 std::cout << "data validation failed : " << dataPtr->getName() << std::endl;
496#ifndef NDEBUG
497 const std::string payload(reinterpret_cast<const char*>(dataPtr->getContent().value()),
498 dataPtr->getContent().value_size());
499 std::cout << payload << std::endl;
500#endif
501 return;
502 }
503
504 // todo: return value to indicate if the insertion succeeds
505 processUpdateData(dataPtr);
506
507 // ideally, data should not be stale?
508 m_socket->publishData(data.getContent(), ndn::time::seconds(3600));
509
510 // if this is not the final block, continue to fetch the next one
511 const ndn::name::Component& finalBlockId = data.getMetaInfo().getFinalBlockId();
512 if (finalBlockId == data.getName()[-1]) {
513 m_isFinished = true;
514 }
515 //else, get the next segment
516 if (!m_isFinished) {
517 ndn::Name nextInterestName = data.getName().getPrefix(-1);
518 uint64_t incomingSegment = data.getName()[-1].toSegment();
519 std::cout << " Next Interest Name " << nextInterestName << " Segment " << incomingSegment++
520 << std::endl;
521 std::shared_ptr<ndn::Interest> nextInterest =
522 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment++));
523 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
524 nextInterest->setMustBeFresh(m_mustBeFresh);
525 m_face->expressInterest(*nextInterest,
526 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
527 this,_1, _2),
528 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
529 this, _1));
530 }
531}
532
533template <typename DatabaseHandler>
534void
535PublishAdapter<DatabaseHandler>::processUpdateData(const ndn::shared_ptr<const ndn::Data>& data)
536{
537 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
538 data->getContent().value_size());
539
540 if (payload.length() <= 0) {
541 return;
542 }
543
544 // the data payload must be JSON format
545 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
546 Json::Value parsedFromPayload;
547 Json::Reader jsonReader;
548 if (!jsonReader.parse(payload, parsedFromPayload)) {
549 // todo: logging events
550 std::cout << "fail to parse the update data" << std::endl;
551 return;
552 }
553 else {
554 std::cout << "received Json format payload : "
555 << parsedFromPayload.toStyledString() << std::endl;
556 }
557 std::stringstream ss;
558 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
559 std::cout << "sql string to insert data : " << ss.str() << std::endl;
560 // todo: before use, check if the connection is not NULL
561 // we may need to use lock here to ensure thread safe
562 operateDatabase(ss.str(), util::ADD);
563 }
564
565 ss.str("");
566 ss.clear();
567 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
568 std::cout << "sql string to remove data: " << ss.str() << std::endl;
569 operateDatabase(ss.str(), util::REMOVE);
570 }
571}
572
573template <typename DatabaseHandler>
574chronosync::SeqNo
575PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
576{
577 // empty
578 return 0;
579}
580
581template <>
582chronosync::SeqNo
583PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
584{
585 std::string sql = "select seq_num from chronosync_update_info where session_name = '"
586 + update.session.toUri() + "';";
587 std::cout << "get latest seqNo : " << sql << std::endl;
588 std::string errMsg;
589 bool success;
590 std::shared_ptr<MYSQL_RES> results
591 = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
592 if (!success) {
593 std::cout << errMsg << std::endl;
594 return 0; //database connection error?
595 }
596 else if (results != nullptr){
597 MYSQL_ROW row;
598 if (mysql_num_rows(results.get()) == 0)
599 return 0;
600
601 while ((row = mysql_fetch_row(results.get())))
602 {
603 chronosync::SeqNo seqNo = std::stoull(row[0]);
604 return seqNo;
605 }
606 }
607 return 0;
608}
609
610template <typename DatabaseHandler>
611void
612PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
613{
614 //empty
615}
616
617template <>
618void
619PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
620{
621 std::string sql = "update chronosync_update_info set seq_num = "
622 + boost::lexical_cast<std::string>(update.high)
623 + " where session_name = '" + update.session.toUri() + "';";
624 std::cout << "renew update Info : " << sql << std::endl;
625 std::string errMsg;
626 bool success = false;
627 m_mutex.lock();
628 util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
629 m_mutex.unlock();
630 if (!success)
631 std::cout << errMsg << std::endl;
632}
633
634template <typename DatabaseHandler>
635void
636PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
637{
638 //empty
639}
640
641template <>
642void
643PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
644{
645 std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
646 + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
647 + ");";
648
649 std::cout << "add update Info : " << sql << std::endl;
650 std::string errMsg;
651 bool success = false;
652 m_mutex.lock();
653 util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
654 m_mutex.unlock();
655 if (!success)
656 std::cout << errMsg << std::endl;
657}
658
659template <typename DatabaseHandler>
660void
661PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
662{
663 // todo: record event, and use recovery Interest to fetch the whole table
664}
665
666template <typename DatabaseHandler>
667void
668PublishAdapter<DatabaseHandler>::onUpdateValidationFailed(const std::shared_ptr<const ndn::Data>& data,
669 const std::string& failureInfo)
670{
671 std::cout << "failed to validate Data" << data->getName() << " : " << failureInfo << std::endl;
672}
673
674template <typename DatabaseHandler>
675void
676PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
677 updates)
678{
679 if (updates.empty()) {
680 return;
681 }
682
683 // multiple updates from different catalog are possible
684 for (size_t i = 0; i < updates.size(); ++ i) {
685 // check if the session is in local DB
686 // if yes, only fetch packets whose seq number is bigger than the one in the DB
687 // if no, directly fetch Data
688 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
689 bool update = false;
690
691 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++ seq) {
692 if (seq > localSeqNo) {
693 m_socket->fetchData(updates[i].session, seq,
694 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
695 bind(&PublishAdapter<DatabaseHandler>::onUpdateValidationFailed,
696 this, _1, _2),
697 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
698 this, _1),
699 RETRY_WHEN_TIMEOUT);
700 std::cout << "send Interest for [" << updates[i].session << ":" << seq << "]" << std::endl;
701 update = true;
702 }
703 }
704 // update the seq session name and seq number in local DB
705 // indicating they are processed. So latter when this node reboots again, won't redo it
706 if (update) {
707 if (localSeqNo > 0)
708 renewUpdateInformation(updates[i]);
709 else
710 addUpdateInformation(updates[i]);
711 }
712 }
713}
714
715template <typename DatabaseHandler>
716void
717PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
718{
719 // empty
720}
721
722template <>
723void
724PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
725{
726 std::string errMsg;
727 bool success = false;
728 m_mutex.lock();
729 atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
730 m_mutex.unlock();
731 if (!success)
732 std::cout << errMsg << std::endl;
733}
734
735template<typename DatabaseHandler>
736bool
737PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
738 Json::Value& jsonValue,
739 util::DatabaseOperation op)
740{
741 if (jsonValue.type() != Json::objectValue) {
742 std::cout << jsonValue.toStyledString() << "is not json object" << std::endl;
743 return false;
744 }
745 if (op == util::ADD) {
746 size_t updateNumber = jsonValue["add"].size();
747 if (updateNumber <= 0)
748 return false;
749
750 sqlString << "INSERT INTO cmip5 (";
751 for (size_t i = 0; i < atmosTableColumns.size(); ++ i) {
752 if (i != 0)
753 sqlString << ", ";
754 sqlString << atmosTableColumns[i];
755 }
756 sqlString << ") VALUES";
757
758 for (size_t i = 0; i < updateNumber; ++ i) { //parse each file name
759 if (i > 0)
760 sqlString << ",";
761 // cast might be overflowed
762 Json::Value item = jsonValue["add"][static_cast<int>(i)];
763 if (!item.isConvertibleTo(Json::stringValue)) {
764 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
765 return false;
766 }
767 std::string fileName(item.asString());
768 // use digest sha256 for now, may be removed
769 ndn::util::Digest<CryptoPP::SHA256> digest;
770 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
771
772 sqlString << "('" << digest.toString() << "','" << fileName << "'";
773
774 // parse the ndn name to get each value for each field
775 if (!name2Fields(sqlString, fileName))
776 return false;
777 sqlString << ")";
778 }
779 sqlString << ";";
780 }
781 else if (op == util::REMOVE) {
782 // remove files from db
783 size_t updateNumber = jsonValue["remove"].size();
784 if (updateNumber <= 0)
785 return false;
786
787 sqlString << "delete from cmip5 where name in (";
788 for (size_t i = 0; i < updateNumber; ++ i) {
789 if (i > 0)
790 sqlString << ",";
791 // cast might be overflowed
792 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
793 if (!item.isConvertibleTo(Json::stringValue)) {
794 std::cout << "malformed JsonQuery string : " << item.toStyledString() << std::endl;
795 return false;
796 }
797 std::string fileName(item.asString());
798
799 sqlString << "'" << fileName << "'";
800 }
801 sqlString << ");";
802 }
803 return true;
804}
805
806template<typename DatabaseHandler>
807bool
808PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
809 std::string& fileName)
810{
811 size_t start = 0;
812 size_t pos = 0;
813 size_t count = 0;
814 std::string token;
815 std::string delimiter = "/";
816 // fileName must starts with either ndn:/ or /
817 std::string nameWithNdn("ndn:/");
818 std::string nameWithSlash("/");
819 if (fileName.find(nameWithNdn) == 0) {
820 start = nameWithNdn.size();
821 }
822 else if (fileName.find(nameWithSlash) == 0) {
823 start = nameWithSlash.size();
824 }
825 else
826 return false;
827
828 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
829 count ++;
830 token = fileName.substr(start, pos - start);
831 if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
832 return false; //fileName contains more than 10 fields
833 }
834 sqlString << ",'" << token << "'";
835 start = pos + 1;
836 }
837
838 // must be 10 fields in total (add the tail one)
839 if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
840 return false;
841 token = fileName.substr(start, std::string::npos - start);
842 sqlString << ",'" << token << "'";
843 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600844}
845
846template<typename DatabaseHandler>
847bool
848PublishAdapter<DatabaseHandler>::validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data)
849{
850 // The data name must be "/<publisher-prefix>/<nonce>"
851 // the prefix is the data name removes the last component
852 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
853
854 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
855 data->getContent().value_size());
856 Json::Value parsedFromString;
857 Json::Reader reader;
858 if (!reader.parse(payload, parsedFromString)) {
859 // parse error, log events
860 std::cout << "Cannot parse the published data " << data->getName() << " into Json" << std::endl;
861 return false;
862 }
863
864 // validate added files...
865 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
866 if (!publisherPrefix.isPrefixOf(
867 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
868 return false;
869 }
870
871 // validate removed files ...
872 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
873 if (!publisherPrefix.isPrefixOf(
874 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
875 return false;
876 }
877 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -0600878}
879
880} // namespace publish
881} // namespace atmos
882#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP