blob: b7b8355feb00d32b0b40dd485f4ebc5bc429c5d5 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060036#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060037
Chengyu Fan46398212015-08-11 11:23:13 -060038#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060039#include <memory>
40#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060041#include <vector>
42#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060043#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060044
Chengyu Fan71b712b2015-09-09 22:13:56 -060045#include "util/logger.hpp"
46
Alison Craig2a4d5282015-04-10 12:00:02 -060047namespace atmos {
48namespace publish {
Chengyu Fan71b712b2015-09-09 22:13:56 -060049#ifdef HAVE_LOG4CXX
50 INIT_LOGGER("PublishAdapter");
51#endif
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060052
Chengyu Fan46398212015-08-11 11:23:13 -060053#define RETRY_WHEN_TIMEOUT 2
Chengyu Fan46398212015-08-11 11:23:13 -060054
Alison Craig2a4d5282015-04-10 12:00:02 -060055/**
56 * PublishAdapter handles the Publish usecases for the catalog
57 */
58template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060059class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060060public:
61 /**
62 * Constructor
63 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060064 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain that will be used for data signing
66 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060067 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060068 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060069 const std::shared_ptr<ndn::KeyChain>& keyChain,
70 std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060071
Alison Craig2a4d5282015-04-10 12:00:02 -060072 virtual
73 ~PublishAdapter();
74
Chengyu Fanb25835b2015-04-28 17:09:35 -060075 /**
76 * Helper function that subscribe to a publish section for the config file
77 */
78 void
79 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060080 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060081 const std::vector<std::string>& nameFields,
82 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084protected:
85 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060086 * Helper function that configures piblishAdapter instance according to publish section
87 * in config file
88 */
89 void
90 onConfig(const util::ConfigSection& section,
91 bool isDryDun,
92 const std::string& fileName,
93 const ndn::Name& prefix);
94
95 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060096 * Initial "please publish this" Interests
97 *
98 * @param filter: InterestFilter that caused this Interest to be routed
99 * @param interest: Interest that needs to be handled
100 */
101 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600102 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600103
Chengyu Fan46398212015-08-11 11:23:13 -0600104 virtual void
105 onTimeout(const ndn::Interest& interest);
106
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 /**
108 * Data containing the actual thing we need to publish
109 *
110 * @param interest: Interest that caused this Data to be routed
111 * @param data: Data that needs to be handled
112 */
113 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600114 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600115
Chengyu Fanb25835b2015-04-28 17:09:35 -0600116 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600117 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600118 */
119 void
Chengyu Fan46398212015-08-11 11:23:13 -0600120 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600121
Chengyu Fanb25835b2015-04-28 17:09:35 -0600122 /**
123 * Helper function that sets filters to make the adapter work
124 */
125 void
126 setFilters();
127
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600128 void
129 setCatalogId();
130
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600131 /**
132 * Function to validate publication changes against the trust model, which is, all file
133 * names must be under the publisher's prefix. This function should be called by a callback
134 * function invoked by validator
135 *
136 * @param data: received data from the publisher
137 */
138 bool
139 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
140
Chengyu Fan46398212015-08-11 11:23:13 -0600141
142 /**
143 * Helper function that processes the sync update
144 *
145 * @param updates: vector that contains all the missing data information
146 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600147 void
148 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
Chengyu Fan46398212015-08-11 11:23:13 -0600149
150 /**
151 * Helper function that processes the update data
152 *
153 * @param data: shared pointer for the fetched update data
154 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600155 void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600156 processUpdateData(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600157
158 /**
159 * Helper function that add data to or remove data from database
160 *
161 * @param sql: sql string to do the add or remove jobs
162 * @param op: enum value indicates the database operation, could be REMOVE, ADD
163 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600164 virtual void
165 operateDatabase(const std::string& sql,
166 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600167
168 /**
169 * Helper function that parses jsonValue to generate sql string, return value indicates
170 * if it is successfully
171 *
172 * @param sqlString: streamstream to save the sql string
173 * @param jsonValue: Json value that contains the update information
174 * @param op: enum value indicates the database operation, could be REMOVE, ADD
175 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600176 bool
177 json2Sql(std::stringstream& sqlString,
178 Json::Value& jsonValue,
179 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600180
181 /**
182 * Helper function to generate sql string based on file name, return value indicates
183 * if it is successfully
184 *
185 * @param sqlString: streamstream to save the sql string
186 * @param fileName: ndn uri string for a file name
187 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600188 bool
189 name2Fields(std::stringstream& sqlstring,
190 std::string& fileName);
Chengyu Fan46398212015-08-11 11:23:13 -0600191
192 /**
193 * Check the local database for the latest sequence number for a ChronoSync update
194 *
195 * @param update: the MissingDataInfo object
196 */
197 chronosync::SeqNo
198 getLatestSeqNo(const chronosync::MissingDataInfo& update);
199
200 /**
201 * Update the local database with the update message
202 *
203 * @param update: the MissingDataInfo object
204 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600205 void
206 renewUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600207
208 /**
209 * Insert the update message into the local database
210 *
211 * @param update: the MissingDataInfo object
212 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600213 void
214 addUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600215
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600216 void
217 onFetchUpdateDataTimeout(const ndn::Interest& interest);
Chengyu Fan46398212015-08-11 11:23:13 -0600218
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600219 void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600220 onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
221 const std::string& failureInfo);
222
223 void
224 validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600225
Chengyu Fanb25835b2015-04-28 17:09:35 -0600226protected:
227 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
228 // Prefix for ChronoSync
229 ndn::Name m_syncPrefix;
230 // Handle to the Catalog's database
231 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600232 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600233 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600234 std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
235 std::vector<std::string> m_tableColumns;
Chengyu Fan46398212015-08-11 11:23:13 -0600236 // mutex to control critical sections
237 std::mutex m_mutex;
Chengyu Fan46398212015-08-11 11:23:13 -0600238 // TODO: create thread for each request, and the variables below should be within the thread
239 bool m_mustBeFresh;
240 bool m_isFinished;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600241 ndn::Name m_catalogId;
Alison Craig2a4d5282015-04-10 12:00:02 -0600242};
243
Alison Craig2a4d5282015-04-10 12:00:02 -0600244
Chengyu Fanb25835b2015-04-28 17:09:35 -0600245template <typename DatabaseHandler>
246PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600247 const std::shared_ptr<ndn::KeyChain>& keyChain,
248 std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600249 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600250 , m_socket(syncSocket)
Chengyu Fan46398212015-08-11 11:23:13 -0600251 , m_mustBeFresh(true)
252 , m_isFinished(false)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600253 , m_catalogId("catalogIdPlaceHolder")
Chengyu Fanb25835b2015-04-28 17:09:35 -0600254{
255}
256
257template <typename DatabaseHandler>
258void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600259PublishAdapter<DatabaseHandler>::setCatalogId()
260{
261 // empty
262}
263
264template <>
265void
266PublishAdapter<MYSQL>::setCatalogId()
267{
268 // use public key digest as the catalog ID
269 ndn::Name keyId;
270 if (m_signingId.empty()) {
271 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
272 } else {
273 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
274 }
275
276 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
277 ndn::Block keyDigest = pKey->computeDigest();
278 m_catalogId.clear();
279 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
280}
281
282template <typename DatabaseHandler>
283void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600284PublishAdapter<DatabaseHandler>::setFilters()
285{
286 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600287 m_registeredPrefixList[publishPrefix] =
288 m_face->setInterestFilter(publishPrefix,
289 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
290 this, _1, _2),
291 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
292 this, _1),
293 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
294 this, _1, _2));
295
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600296 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
Chengyu Fan46398212015-08-11 11:23:13 -0600297 m_socket.reset(new chronosync::Socket(m_syncPrefix,
298 catalogSync,
299 *m_face,
300 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
301 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600302}
303
304template <typename DatabaseHandler>
305PublishAdapter<DatabaseHandler>::~PublishAdapter()
306{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600307 for (const auto& itr : m_registeredPrefixList) {
308 if (static_cast<bool>(itr.second))
309 m_face->unsetInterestFilter(itr.second);
310 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600311}
312
313template <typename DatabaseHandler>
314void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600315PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600316 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600317 const std::vector<std::string>& nameFields,
318 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600319{
Chengyu Fan92440162015-07-09 14:43:31 -0600320 m_nameFields = nameFields;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600321
322 //initialize m_tableColumns
323 m_tableColumns = nameFields;
324 auto it = m_tableColumns.begin();
325 it = m_tableColumns.insert(it, std::string("name"));
326 it = m_tableColumns.insert(it, std::string("sha256"));
327
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600328 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600329 config.addSectionHandler("publishAdapter",
330 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
331 _1, _2, _3, prefix));
332}
333
334template <typename DatabaseHandler>
335void
336PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
337 bool isDryRun,
338 const std::string& filename,
339 const ndn::Name& prefix)
340{
341 using namespace util;
342 if (isDryRun) {
343 return;
344 }
345
346 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
347 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
348
349 for (auto item = section.begin();
350 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600351 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600352 {
353 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600354 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600355 if (signingId.empty()) {
356 throw Error("Invalid value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600357 " in \"publish\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600358 }
359 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600360 else if (item->first == "security") {
361 // when use, the validator must specify the callback func to handle the validated data
362 // it should be called when the Data packet that contains the published file names is received
363 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
364 m_publishValidator->load(item->second, filename);
365 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600366 else if (item->first == "database") {
367 const util::ConfigSection& databaseSection = item->second;
368 for (auto subItem = databaseSection.begin();
369 subItem != databaseSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600370 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600371 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600372 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600373 }
374 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600375 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600376 }
377 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600378 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600379 }
380 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600381 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600382 }
383 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600384
385 // Items below must not be empty
386 if (dbServer.empty()){
387 throw Error("Invalid value for \"dbServer\""
388 " in \"publish\" section");
389 }
390 if (dbName.empty()){
391 throw Error("Invalid value for \"dbName\""
392 " in \"publish\" section");
393 }
394 if (dbUser.empty()){
395 throw Error("Invalid value for \"dbUser\""
396 " in \"publish\" section");
397 }
398 if (dbPasswd.empty()){
399 throw Error("Invalid value for \"dbPasswd\""
400 " in \"publish\" section");
401 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600402 }
403 else if (item->first == "sync") {
404 const util::ConfigSection& synSection = item->second;
405 for (auto subItem = synSection.begin();
406 subItem != synSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600407 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600408 if (subItem->first == "prefix") {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600409 syncPrefix = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600410 if (syncPrefix.empty()){
411 throw Error("Invalid value for \"prefix\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600412 " in \"publish\\sync\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600413 }
414 }
415 // todo: parse the sync_security section
416 }
417 }
418 }
419
420 m_prefix = prefix;
421 m_signingId = ndn::Name(signingId);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600422 setCatalogId();
423
Chengyu Fan71b712b2015-09-09 22:13:56 -0600424 m_syncPrefix = syncPrefix;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600425 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
426
Chengyu Fan46398212015-08-11 11:23:13 -0600427 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600428 setFilters();
429}
430
431template <typename DatabaseHandler>
432void
Chengyu Fan46398212015-08-11 11:23:13 -0600433PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600434{
435 //empty
436}
437
438template <>
439void
Chengyu Fan46398212015-08-11 11:23:13 -0600440PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600441{
442 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
443
444 m_databaseHandler = conn;
Chengyu Fan46398212015-08-11 11:23:13 -0600445
446 if (m_databaseHandler != nullptr) {
447 std::string errMsg;
448 bool success = false;
449 // Ignore errors (when database already exists, errors are expected)
450 std::string createSyncTable =
451 "CREATE TABLE `chronosync_update_info` (\
452 `id` int(11) NOT NULL AUTO_INCREMENT, \
453 `session_name` varchar(1000) NOT NULL, \
454 `seq_num` int(11) NOT NULL, \
455 PRIMARY KEY (`id`), \
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600456 UNIQUE KEY `id_UNIQUE` (`id`) \
Chengyu Fan46398212015-08-11 11:23:13 -0600457 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
458
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600459 MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
460 success, errMsg);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600461#ifndef NDEBUG
Chengyu Fan46398212015-08-11 11:23:13 -0600462 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600463 _LOG_DEBUG(errMsg);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600464#endif
Chengyu Fan46398212015-08-11 11:23:13 -0600465
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600466 // create SQL string for table creation, id, sha256, and name are columns that we need
467 std::stringstream ss;
468 ss << "CREATE TABLE `" << m_databaseTable << "` (\
469 `id` int(100) NOT NULL AUTO_INCREMENT, \
470 `sha256` varchar(64) NOT NULL, \
471 `name` varchar(1000) NOT NULL,";
472 for (size_t i = 0; i < m_nameFields.size(); i++) {
473 ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
474 }
475 ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
Chengyu Fan46398212015-08-11 11:23:13 -0600476 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
477
478 success = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600479 MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600480
481#ifndef NDEBUG
Chengyu Fan46398212015-08-11 11:23:13 -0600482 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600483 _LOG_DEBUG(errMsg);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600484#endif
Chengyu Fan46398212015-08-11 11:23:13 -0600485 }
486 else {
487 throw Error("cannot connect to the Database");
488 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600489}
490
491template <typename DatabaseHandler>
492void
493PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
494 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600495{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600496 _LOG_DEBUG(">> PublishAdapter::onPublishInterest");
497
Chengyu Fan46398212015-08-11 11:23:13 -0600498 // Example Interest : /cmip5/publish/<uri>/<nonce>
Chengyu Fan71b712b2015-09-09 22:13:56 -0600499 _LOG_DEBUG(interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600500
501 //send back ACK
Chengyu Fan71b712b2015-09-09 22:13:56 -0600502 char buf[] = "ACK";
Chengyu Fan46398212015-08-11 11:23:13 -0600503 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
504 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
505 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
506 m_keyChain->sign(*data);
507 m_face->put(*data);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600508
Chengyu Fancfb80c72015-10-19 16:50:04 -0600509 _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600510
511
512 //TODO: if already in catalog, what do we do?
513 //ask for content
514 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
515 size_t m_nextSegment = 0;
516 std::shared_ptr<ndn::Interest> retrieveInterest =
517 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
518 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
519 retrieveInterest->setMustBeFresh(m_mustBeFresh);
520 m_face->expressInterest(*retrieveInterest,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600521 bind(&PublishAdapter<DatabaseHandler>::onPublishedData,
Chengyu Fan46398212015-08-11 11:23:13 -0600522 this,_1, _2),
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
524
525 _LOG_DEBUG("Expressing Interest " << retrieveInterest->toUri());
526 _LOG_DEBUG("<< PublishAdapter::onPublishInterest");
Chengyu Fan46398212015-08-11 11:23:13 -0600527}
528
529template <typename DatabaseHandler>
530void
531PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
532{
Chengyu Fancfb80c72015-10-19 16:50:04 -0600533 _LOG_ERROR(interest.getName() << "timed out");
Alison Craig2a4d5282015-04-10 12:00:02 -0600534}
535
536template <typename DatabaseHandler>
537void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600538PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
539 const std::string& failureInfo)
540{
Chengyu Fancfb80c72015-10-19 16:50:04 -0600541 _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600542}
543
544template <typename DatabaseHandler>
545void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600546PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
547 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600548{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600549 _LOG_DEBUG(">> PublishAdapter::onPublishedData");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600550 _LOG_DEBUG("Recv data : " << data.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600551 if (data.getContent().empty()) {
552 return;
553 }
Chengyu Fan612f11c2015-09-23 16:24:47 -0600554 if (m_publishValidator != nullptr) {
555 m_publishValidator->validate(data,
556 bind(&PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod, this, _1),
557 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed, this, _1, _2));
558 }
559 else {
560 std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
561 validatePublishedDataPaylod(dataPtr);
562 }
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600563}
Chengyu Fan46398212015-08-11 11:23:13 -0600564
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600565template <typename DatabaseHandler>
566void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600567PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600568{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600569 _LOG_DEBUG(">> PublishAdapter::onValidatePublishedDataPayload");
570
Chengyu Fan46398212015-08-11 11:23:13 -0600571 // validate published data payload, if failed, return
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600572 if (!validatePublicationChanges(data)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600573 _LOG_ERROR("Data validation failed : " << data->getName());
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600574 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
575 data->getContent().value_size());
Chengyu Fan71b712b2015-09-09 22:13:56 -0600576 _LOG_DEBUG(payload);
Chengyu Fan46398212015-08-11 11:23:13 -0600577 return;
578 }
579
580 // todo: return value to indicate if the insertion succeeds
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600581 processUpdateData(data);
Chengyu Fan46398212015-08-11 11:23:13 -0600582
583 // ideally, data should not be stale?
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600584 m_socket->publishData(data->getContent(), ndn::time::seconds(3600));
Chengyu Fan46398212015-08-11 11:23:13 -0600585
586 // if this is not the final block, continue to fetch the next one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600587 const ndn::name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
588 if (finalBlockId == data->getName()[-1]) {
Chengyu Fan46398212015-08-11 11:23:13 -0600589 m_isFinished = true;
590 }
591 //else, get the next segment
592 if (!m_isFinished) {
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600593 ndn::Name nextInterestName = data->getName().getPrefix(-1);
594 uint64_t incomingSegment = data->getName()[-1].toSegment();
595 incomingSegment++;
Chengyu Fan71b712b2015-09-09 22:13:56 -0600596
597 _LOG_DEBUG("Next Interest Name " << nextInterestName << " Segment " << incomingSegment);
598
Chengyu Fan46398212015-08-11 11:23:13 -0600599 std::shared_ptr<ndn::Interest> nextInterest =
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600600 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment));
Chengyu Fan46398212015-08-11 11:23:13 -0600601 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
602 nextInterest->setMustBeFresh(m_mustBeFresh);
603 m_face->expressInterest(*nextInterest,
604 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
605 this,_1, _2),
606 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
607 this, _1));
608 }
609}
610
611template <typename DatabaseHandler>
612void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600613PublishAdapter<DatabaseHandler>::processUpdateData(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan46398212015-08-11 11:23:13 -0600614{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600615 _LOG_DEBUG(">> PublishAdapter::processUpdateData");
616
Chengyu Fan46398212015-08-11 11:23:13 -0600617 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
618 data->getContent().value_size());
619
620 if (payload.length() <= 0) {
621 return;
622 }
623
624 // the data payload must be JSON format
625 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
626 Json::Value parsedFromPayload;
627 Json::Reader jsonReader;
628 if (!jsonReader.parse(payload, parsedFromPayload)) {
629 // todo: logging events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600630 _LOG_DEBUG("Fail to parse the update data");
Chengyu Fan46398212015-08-11 11:23:13 -0600631 return;
632 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600633
Chengyu Fan46398212015-08-11 11:23:13 -0600634 std::stringstream ss;
635 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600636 // todo: before use, check if the connection is not NULL
637 // we may need to use lock here to ensure thread safe
638 operateDatabase(ss.str(), util::ADD);
639 }
640
641 ss.str("");
642 ss.clear();
643 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600644 operateDatabase(ss.str(), util::REMOVE);
645 }
646}
647
648template <typename DatabaseHandler>
649chronosync::SeqNo
650PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
651{
652 // empty
653 return 0;
654}
655
656template <>
657chronosync::SeqNo
658PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
659{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600660 _LOG_DEBUG(">> PublishAdapter::getLatestSeqNo");
661
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600662 std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
Chengyu Fan46398212015-08-11 11:23:13 -0600663 + update.session.toUri() + "';";
Chengyu Fan71b712b2015-09-09 22:13:56 -0600664
Chengyu Fan46398212015-08-11 11:23:13 -0600665 std::string errMsg;
666 bool success;
667 std::shared_ptr<MYSQL_RES> results
668 = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
669 if (!success) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600670 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600671 return 0; //database connection error?
672 }
673 else if (results != nullptr){
674 MYSQL_ROW row;
675 if (mysql_num_rows(results.get()) == 0)
676 return 0;
677
678 while ((row = mysql_fetch_row(results.get())))
679 {
680 chronosync::SeqNo seqNo = std::stoull(row[0]);
681 return seqNo;
682 }
683 }
684 return 0;
685}
686
687template <typename DatabaseHandler>
688void
689PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
690{
691 //empty
692}
693
694template <>
695void
696PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
697{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600698 std::string sql = "UPDATE chronosync_update_info SET seq_num = "
Chengyu Fan46398212015-08-11 11:23:13 -0600699 + boost::lexical_cast<std::string>(update.high)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600700 + " WHERE session_name = '" + update.session.toUri() + "';";
Chengyu Fan71b712b2015-09-09 22:13:56 -0600701
Chengyu Fan46398212015-08-11 11:23:13 -0600702 std::string errMsg;
703 bool success = false;
704 m_mutex.lock();
705 util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
706 m_mutex.unlock();
707 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600708 _LOG_ERROR(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600709}
710
711template <typename DatabaseHandler>
712void
713PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
714{
715 //empty
716}
717
718template <>
719void
720PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
721{
722 std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
723 + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
724 + ");";
725
Chengyu Fan46398212015-08-11 11:23:13 -0600726 std::string errMsg;
727 bool success = false;
728 m_mutex.lock();
729 util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
730 m_mutex.unlock();
731 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600732 _LOG_ERROR(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600733}
734
735template <typename DatabaseHandler>
736void
737PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
738{
739 // todo: record event, and use recovery Interest to fetch the whole table
Chengyu Fancfb80c72015-10-19 16:50:04 -0600740 _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600741}
742
743template <typename DatabaseHandler>
744void
Chengyu Fan46398212015-08-11 11:23:13 -0600745PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
746 updates)
747{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600748 _LOG_DEBUG(">> PublishAdapter::processSyncUpdate");
749
Chengyu Fan46398212015-08-11 11:23:13 -0600750 if (updates.empty()) {
751 return;
752 }
753
754 // multiple updates from different catalog are possible
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600755 for (size_t i = 0; i < updates.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600756 // check if the session is in local DB
757 // if yes, only fetch packets whose seq number is bigger than the one in the DB
758 // if no, directly fetch Data
759 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
760 bool update = false;
761
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600762 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
Chengyu Fan46398212015-08-11 11:23:13 -0600763 if (seq > localSeqNo) {
764 m_socket->fetchData(updates[i].session, seq,
765 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600766 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed,
Chengyu Fan46398212015-08-11 11:23:13 -0600767 this, _1, _2),
768 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
769 this, _1),
770 RETRY_WHEN_TIMEOUT);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600771
772 _LOG_DEBUG("Interest for [" << updates[i].session << ":" << seq << "]");
773
Chengyu Fan46398212015-08-11 11:23:13 -0600774 update = true;
775 }
776 }
777 // update the seq session name and seq number in local DB
778 // indicating they are processed. So latter when this node reboots again, won't redo it
779 if (update) {
780 if (localSeqNo > 0)
781 renewUpdateInformation(updates[i]);
782 else
783 addUpdateInformation(updates[i]);
784 }
785 }
786}
787
788template <typename DatabaseHandler>
789void
790PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
791{
792 // empty
793}
794
795template <>
796void
797PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
798{
799 std::string errMsg;
800 bool success = false;
801 m_mutex.lock();
802 atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
803 m_mutex.unlock();
804 if (!success)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600805 _LOG_ERROR(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600806}
807
808template<typename DatabaseHandler>
809bool
810PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
811 Json::Value& jsonValue,
812 util::DatabaseOperation op)
813{
814 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600815 return false;
816 }
817 if (op == util::ADD) {
818 size_t updateNumber = jsonValue["add"].size();
819 if (updateNumber <= 0)
820 return false;
821
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600822 sqlString << "INSERT INTO " << m_databaseTable << " (";
Chengyu Fancfb80c72015-10-19 16:50:04 -0600823 for (size_t i = 0; i < m_tableColumns.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600824 if (i != 0)
825 sqlString << ", ";
Chengyu Fancfb80c72015-10-19 16:50:04 -0600826 sqlString << m_tableColumns[i];
Chengyu Fan46398212015-08-11 11:23:13 -0600827 }
828 sqlString << ") VALUES";
829
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600830 for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
Chengyu Fan46398212015-08-11 11:23:13 -0600831 if (i > 0)
832 sqlString << ",";
833 // cast might be overflowed
834 Json::Value item = jsonValue["add"][static_cast<int>(i)];
835 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600836 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600837 return false;
838 }
839 std::string fileName(item.asString());
840 // use digest sha256 for now, may be removed
841 ndn::util::Digest<CryptoPP::SHA256> digest;
842 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
843
844 sqlString << "('" << digest.toString() << "','" << fileName << "'";
845
846 // parse the ndn name to get each value for each field
847 if (!name2Fields(sqlString, fileName))
848 return false;
849 sqlString << ")";
850 }
851 sqlString << ";";
852 }
853 else if (op == util::REMOVE) {
854 // remove files from db
855 size_t updateNumber = jsonValue["remove"].size();
856 if (updateNumber <= 0)
857 return false;
858
Chengyu Fan71b712b2015-09-09 22:13:56 -0600859 sqlString << "DELETE FROM " << m_databaseTable << " WHERE name IN (";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600860 for (size_t i = 0; i < updateNumber; ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600861 if (i > 0)
862 sqlString << ",";
863 // cast might be overflowed
864 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
865 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600866 _LOG_ERROR("Malformed JsonQuery");
Chengyu Fan46398212015-08-11 11:23:13 -0600867 return false;
868 }
869 std::string fileName(item.asString());
870
871 sqlString << "'" << fileName << "'";
872 }
873 sqlString << ");";
874 }
875 return true;
876}
877
878template<typename DatabaseHandler>
879bool
880PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
881 std::string& fileName)
882{
883 size_t start = 0;
884 size_t pos = 0;
885 size_t count = 0;
886 std::string token;
887 std::string delimiter = "/";
888 // fileName must starts with either ndn:/ or /
889 std::string nameWithNdn("ndn:/");
890 std::string nameWithSlash("/");
891 if (fileName.find(nameWithNdn) == 0) {
892 start = nameWithNdn.size();
893 }
894 else if (fileName.find(nameWithSlash) == 0) {
895 start = nameWithSlash.size();
896 }
897 else
898 return false;
899
900 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600901 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600902 token = fileName.substr(start, pos - start);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600903 // exclude the sha256 and name (already processed)
904 if (count >= m_tableColumns.size() - 2) {
905 return false;
Chengyu Fan46398212015-08-11 11:23:13 -0600906 }
907 sqlString << ",'" << token << "'";
908 start = pos + 1;
909 }
910
Chengyu Fancfb80c72015-10-19 16:50:04 -0600911 // sha256 and name have been processed, and the last token will be processed later
912 if (count != m_tableColumns.size() - 3 || std::string::npos == start)
Chengyu Fan46398212015-08-11 11:23:13 -0600913 return false;
914 token = fileName.substr(start, std::string::npos - start);
915 sqlString << ",'" << token << "'";
916 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600917}
918
919template<typename DatabaseHandler>
920bool
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600921PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
922 std::shared_ptr<const ndn::Data>& data)
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600923{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600924 _LOG_DEBUG(">> PublishAdapter::validatePublicationChanges");
925
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600926 // The data name must be "/<publisher-prefix>/<nonce>"
927 // the prefix is the data name removes the last component
928 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
929
930 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
931 data->getContent().value_size());
932 Json::Value parsedFromString;
933 Json::Reader reader;
934 if (!reader.parse(payload, parsedFromString)) {
935 // parse error, log events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600936 _LOG_DEBUG("Fail to parse the published Data" << data->getName());
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600937 return false;
938 }
939
940 // validate added files...
941 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
942 if (!publisherPrefix.isPrefixOf(
943 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
944 return false;
945 }
946
947 // validate removed files ...
948 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
949 if (!publisherPrefix.isPrefixOf(
950 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
951 return false;
952 }
953 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -0600954}
955
956} // namespace publish
957} // namespace atmos
958#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP