blob: 6bd32fa8c627d430bed861dafe51e98569023c4f [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060036#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060037
Chengyu Fan46398212015-08-11 11:23:13 -060038#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060039#include <memory>
40#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060041#include <vector>
42#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060043#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060044
Chengyu Fan71b712b2015-09-09 22:13:56 -060045#include "util/logger.hpp"
46
Alison Craig2a4d5282015-04-10 12:00:02 -060047namespace atmos {
48namespace publish {
Chengyu Fan71b712b2015-09-09 22:13:56 -060049#ifdef HAVE_LOG4CXX
50 INIT_LOGGER("PublishAdapter");
51#endif
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060052
Chengyu Fan46398212015-08-11 11:23:13 -060053#define RETRY_WHEN_TIMEOUT 2
Chengyu Fan46398212015-08-11 11:23:13 -060054
Alison Craig2a4d5282015-04-10 12:00:02 -060055/**
56 * PublishAdapter handles the Publish usecases for the catalog
57 */
58template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060059class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060060public:
61 /**
62 * Constructor
63 *
Chengyu Fancfb80c72015-10-19 16:50:04 -060064 * @param face: Face that will be used for NDN communications
65 * @param keyChain: KeyChain that will be used for data signing
66 * @param syncSocket: ChronoSync socket
Alison Craig2a4d5282015-04-10 12:00:02 -060067 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060068 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -060069 const std::shared_ptr<ndn::KeyChain>& keyChain,
70 std::shared_ptr<chronosync::Socket>& syncSocket);
Alison Craig2a4d5282015-04-10 12:00:02 -060071
Alison Craig2a4d5282015-04-10 12:00:02 -060072 virtual
73 ~PublishAdapter();
74
Chengyu Fanb25835b2015-04-28 17:09:35 -060075 /**
76 * Helper function that subscribe to a publish section for the config file
77 */
78 void
79 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060080 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060081 const std::vector<std::string>& nameFields,
82 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060083
Alison Craig2a4d5282015-04-10 12:00:02 -060084protected:
85 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060086 * Helper function that configures piblishAdapter instance according to publish section
87 * in config file
88 */
89 void
90 onConfig(const util::ConfigSection& section,
91 bool isDryDun,
92 const std::string& fileName,
93 const ndn::Name& prefix);
94
95 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060096 * Initial "please publish this" Interests
97 *
98 * @param filter: InterestFilter that caused this Interest to be routed
99 * @param interest: Interest that needs to be handled
100 */
101 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600102 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600103
Chengyu Fan46398212015-08-11 11:23:13 -0600104 virtual void
105 onTimeout(const ndn::Interest& interest);
106
Alison Craig2a4d5282015-04-10 12:00:02 -0600107 /**
108 * Data containing the actual thing we need to publish
109 *
110 * @param interest: Interest that caused this Data to be routed
111 * @param data: Data that needs to be handled
112 */
113 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600114 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600115
Chengyu Fanb25835b2015-04-28 17:09:35 -0600116 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600117 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600118 */
119 void
Chengyu Fan46398212015-08-11 11:23:13 -0600120 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600121
Chengyu Fan31737f12016-01-12 21:08:50 -0700122 void
123 closeDatabaseHandler();
124
Chengyu Fanb25835b2015-04-28 17:09:35 -0600125 /**
126 * Helper function that sets filters to make the adapter work
127 */
128 void
129 setFilters();
130
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600131 void
132 setCatalogId();
133
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600134 /**
135 * Function to validate publication changes against the trust model, which is, all file
136 * names must be under the publisher's prefix. This function should be called by a callback
137 * function invoked by validator
138 *
139 * @param data: received data from the publisher
140 */
141 bool
142 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
143
Chengyu Fan46398212015-08-11 11:23:13 -0600144 /**
145 * Helper function that processes the sync update
146 *
147 * @param updates: vector that contains all the missing data information
148 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600149 void
150 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
Chengyu Fan46398212015-08-11 11:23:13 -0600151
152 /**
153 * Helper function that processes the update data
154 *
155 * @param data: shared pointer for the fetched update data
156 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600157 void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600158 processUpdateData(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600159
160 /**
161 * Helper function that add data to or remove data from database
162 *
163 * @param sql: sql string to do the add or remove jobs
164 * @param op: enum value indicates the database operation, could be REMOVE, ADD
165 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600166 virtual void
167 operateDatabase(const std::string& sql,
168 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600169
170 /**
171 * Helper function that parses jsonValue to generate sql string, return value indicates
172 * if it is successfully
173 *
174 * @param sqlString: streamstream to save the sql string
175 * @param jsonValue: Json value that contains the update information
176 * @param op: enum value indicates the database operation, could be REMOVE, ADD
177 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600178 bool
179 json2Sql(std::stringstream& sqlString,
180 Json::Value& jsonValue,
181 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600182
183 /**
184 * Helper function to generate sql string based on file name, return value indicates
185 * if it is successfully
186 *
187 * @param sqlString: streamstream to save the sql string
188 * @param fileName: ndn uri string for a file name
189 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600190 bool
191 name2Fields(std::stringstream& sqlstring,
192 std::string& fileName);
Chengyu Fan46398212015-08-11 11:23:13 -0600193
194 /**
195 * Check the local database for the latest sequence number for a ChronoSync update
196 *
197 * @param update: the MissingDataInfo object
198 */
199 chronosync::SeqNo
200 getLatestSeqNo(const chronosync::MissingDataInfo& update);
201
202 /**
203 * Update the local database with the update message
204 *
205 * @param update: the MissingDataInfo object
206 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600207 void
208 renewUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600209
210 /**
211 * Insert the update message into the local database
212 *
213 * @param update: the MissingDataInfo object
214 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600215 void
216 addUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600217
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600218 void
219 onFetchUpdateDataTimeout(const ndn::Interest& interest);
Chengyu Fan46398212015-08-11 11:23:13 -0600220
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600221 void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600222 onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
223 const std::string& failureInfo);
224
225 void
226 validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600227
Chengyu Fanb25835b2015-04-28 17:09:35 -0600228protected:
229 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
230 // Prefix for ChronoSync
231 ndn::Name m_syncPrefix;
232 // Handle to the Catalog's database
233 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600234 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600235 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600236 std::shared_ptr<chronosync::Socket>& m_socket; // SyncSocket
237 std::vector<std::string> m_tableColumns;
Chengyu Fan46398212015-08-11 11:23:13 -0600238 // mutex to control critical sections
239 std::mutex m_mutex;
Chengyu Fan46398212015-08-11 11:23:13 -0600240 // TODO: create thread for each request, and the variables below should be within the thread
241 bool m_mustBeFresh;
242 bool m_isFinished;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600243 ndn::Name m_catalogId;
Alison Craig2a4d5282015-04-10 12:00:02 -0600244};
245
Alison Craig2a4d5282015-04-10 12:00:02 -0600246
Chengyu Fanb25835b2015-04-28 17:09:35 -0600247template <typename DatabaseHandler>
248PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
Chengyu Fancfb80c72015-10-19 16:50:04 -0600249 const std::shared_ptr<ndn::KeyChain>& keyChain,
250 std::shared_ptr<chronosync::Socket>& syncSocket)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600251 : util::CatalogAdapter(face, keyChain)
Chengyu Fancfb80c72015-10-19 16:50:04 -0600252 , m_socket(syncSocket)
Chengyu Fan46398212015-08-11 11:23:13 -0600253 , m_mustBeFresh(true)
254 , m_isFinished(false)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600255 , m_catalogId("catalogIdPlaceHolder")
Chengyu Fanb25835b2015-04-28 17:09:35 -0600256{
257}
258
259template <typename DatabaseHandler>
260void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600261PublishAdapter<DatabaseHandler>::setCatalogId()
262{
263 // empty
264}
265
266template <>
267void
Chengyu Fan31737f12016-01-12 21:08:50 -0700268PublishAdapter<ConnectionPool_T>::setCatalogId()
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600269{
270 // use public key digest as the catalog ID
271 ndn::Name keyId;
272 if (m_signingId.empty()) {
273 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
274 } else {
275 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
276 }
277
278 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
279 ndn::Block keyDigest = pKey->computeDigest();
280 m_catalogId.clear();
281 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
282}
283
284template <typename DatabaseHandler>
285void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600286PublishAdapter<DatabaseHandler>::setFilters()
287{
288 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600289 m_registeredPrefixList[publishPrefix] =
290 m_face->setInterestFilter(publishPrefix,
291 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
292 this, _1, _2),
293 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
294 this, _1),
295 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
296 this, _1, _2));
297
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600298 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
Chengyu Fan46398212015-08-11 11:23:13 -0600299 m_socket.reset(new chronosync::Socket(m_syncPrefix,
300 catalogSync,
301 *m_face,
302 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
303 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600304}
305
306template <typename DatabaseHandler>
Chengyu Fan31737f12016-01-12 21:08:50 -0700307void
308PublishAdapter<DatabaseHandler>::closeDatabaseHandler()
309{
310}
311
312template <>
313void
314PublishAdapter<ConnectionPool_T>::closeDatabaseHandler()
315{
316 ConnectionPool_stop(*m_databaseHandler);
317}
318
319template <typename DatabaseHandler>
Alison Craig2a4d5282015-04-10 12:00:02 -0600320PublishAdapter<DatabaseHandler>::~PublishAdapter()
321{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600322 for (const auto& itr : m_registeredPrefixList) {
323 if (static_cast<bool>(itr.second))
324 m_face->unsetInterestFilter(itr.second);
325 }
Chengyu Fan31737f12016-01-12 21:08:50 -0700326
327 closeDatabaseHandler();
Alison Craig2a4d5282015-04-10 12:00:02 -0600328}
329
330template <typename DatabaseHandler>
331void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600332PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600333 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600334 const std::vector<std::string>& nameFields,
335 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600336{
Chengyu Fan92440162015-07-09 14:43:31 -0600337 m_nameFields = nameFields;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600338
339 //initialize m_tableColumns
340 m_tableColumns = nameFields;
341 auto it = m_tableColumns.begin();
342 it = m_tableColumns.insert(it, std::string("name"));
343 it = m_tableColumns.insert(it, std::string("sha256"));
344
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600345 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600346 config.addSectionHandler("publishAdapter",
347 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
348 _1, _2, _3, prefix));
349}
350
351template <typename DatabaseHandler>
352void
353PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
354 bool isDryRun,
355 const std::string& filename,
356 const ndn::Name& prefix)
357{
358 using namespace util;
359 if (isDryRun) {
360 return;
361 }
362
363 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
364 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
365
366 for (auto item = section.begin();
367 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600368 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600369 {
370 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600371 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600372 if (signingId.empty()) {
373 throw Error("Invalid value for \"signingId\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600374 " in \"publish\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600375 }
376 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600377 else if (item->first == "security") {
378 // when use, the validator must specify the callback func to handle the validated data
379 // it should be called when the Data packet that contains the published file names is received
380 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
381 m_publishValidator->load(item->second, filename);
382 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600383 else if (item->first == "database") {
384 const util::ConfigSection& databaseSection = item->second;
385 for (auto subItem = databaseSection.begin();
386 subItem != databaseSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600387 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600388 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600389 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600390 }
391 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600392 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600393 }
394 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600395 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600396 }
397 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600398 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600399 }
400 }
Chengyu Fancfb80c72015-10-19 16:50:04 -0600401
402 // Items below must not be empty
403 if (dbServer.empty()){
404 throw Error("Invalid value for \"dbServer\""
405 " in \"publish\" section");
406 }
407 if (dbName.empty()){
408 throw Error("Invalid value for \"dbName\""
409 " in \"publish\" section");
410 }
411 if (dbUser.empty()){
412 throw Error("Invalid value for \"dbUser\""
413 " in \"publish\" section");
414 }
415 if (dbPasswd.empty()){
416 throw Error("Invalid value for \"dbPasswd\""
417 " in \"publish\" section");
418 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600419 }
420 else if (item->first == "sync") {
421 const util::ConfigSection& synSection = item->second;
422 for (auto subItem = synSection.begin();
423 subItem != synSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600424 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600425 if (subItem->first == "prefix") {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600426 syncPrefix = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427 if (syncPrefix.empty()){
428 throw Error("Invalid value for \"prefix\""
Chengyu Fancfb80c72015-10-19 16:50:04 -0600429 " in \"publish\\sync\" section");
Chengyu Fanb25835b2015-04-28 17:09:35 -0600430 }
431 }
432 // todo: parse the sync_security section
433 }
434 }
435 }
436
437 m_prefix = prefix;
438 m_signingId = ndn::Name(signingId);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600439 setCatalogId();
440
Chengyu Fan71b712b2015-09-09 22:13:56 -0600441 m_syncPrefix = syncPrefix;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600442 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
443
Chengyu Fan46398212015-08-11 11:23:13 -0600444 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600445 setFilters();
446}
447
448template <typename DatabaseHandler>
449void
Chengyu Fan46398212015-08-11 11:23:13 -0600450PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600451{
452 //empty
453}
454
455template <>
456void
Chengyu Fan31737f12016-01-12 21:08:50 -0700457PublishAdapter<ConnectionPool_T>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600458{
Chengyu Fan31737f12016-01-12 21:08:50 -0700459 m_databaseHandler = zdbConnectionSetup(databaseId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600460
Chengyu Fan31737f12016-01-12 21:08:50 -0700461 Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
Chengyu Fan46398212015-08-11 11:23:13 -0600462
Chengyu Fan31737f12016-01-12 21:08:50 -0700463 if (conn != NULL) {
464
Chengyu Fan46398212015-08-11 11:23:13 -0600465 // Ignore errors (when database already exists, errors are expected)
466 std::string createSyncTable =
467 "CREATE TABLE `chronosync_update_info` (\
468 `id` int(11) NOT NULL AUTO_INCREMENT, \
469 `session_name` varchar(1000) NOT NULL, \
470 `seq_num` int(11) NOT NULL, \
471 PRIMARY KEY (`id`), \
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600472 UNIQUE KEY `id_UNIQUE` (`id`) \
Chengyu Fan46398212015-08-11 11:23:13 -0600473 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
474
Chengyu Fan31737f12016-01-12 21:08:50 -0700475 // must use libzdb's try-catch style
476 TRY {
477 Connection_execute(conn,
478 reinterpret_cast<const char*>(createSyncTable.c_str()), createSyncTable.size());
479 }
480 CATCH(SQLException) {
481 _LOG_ERROR(Connection_getLastError(conn));
482 }
483 END_TRY;
Chengyu Fan46398212015-08-11 11:23:13 -0600484
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600485 // create SQL string for table creation, id, sha256, and name are columns that we need
486 std::stringstream ss;
487 ss << "CREATE TABLE `" << m_databaseTable << "` (\
488 `id` int(100) NOT NULL AUTO_INCREMENT, \
489 `sha256` varchar(64) NOT NULL, \
490 `name` varchar(1000) NOT NULL,";
491 for (size_t i = 0; i < m_nameFields.size(); i++) {
492 ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
493 }
Chengyu Fan3ffd70e2016-06-27 16:49:27 -0600494 ss << "`has_metadata` tinyint(1) DEFAULT NULL, ";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600495 ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
Chengyu Fan46398212015-08-11 11:23:13 -0600496 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
497
Chengyu Fan31737f12016-01-12 21:08:50 -0700498 // must use libzdb's try-catch style
499 TRY {
500 Connection_execute(conn,
501 reinterpret_cast<const char*>(ss.str().c_str()), ss.str().size());
502 }
503 CATCH(SQLException) {
504 _LOG_ERROR(Connection_getLastError(conn));
505 }
506 END_TRY;
Chengyu Fancfb80c72015-10-19 16:50:04 -0600507
Chengyu Fan31737f12016-01-12 21:08:50 -0700508 Connection_close(conn);
Chengyu Fan46398212015-08-11 11:23:13 -0600509 }
510 else {
511 throw Error("cannot connect to the Database");
512 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600513}
514
515template <typename DatabaseHandler>
516void
517PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
518 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600519{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600520 _LOG_DEBUG(">> PublishAdapter::onPublishInterest");
521
Chengyu Fan46398212015-08-11 11:23:13 -0600522 // Example Interest : /cmip5/publish/<uri>/<nonce>
Chengyu Fan71b712b2015-09-09 22:13:56 -0600523 _LOG_DEBUG(interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600524
525 //send back ACK
Chengyu Fan71b712b2015-09-09 22:13:56 -0600526 char buf[] = "ACK";
Chengyu Fan46398212015-08-11 11:23:13 -0600527 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
528 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
529 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
530 m_keyChain->sign(*data);
531 m_face->put(*data);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600532
Chengyu Fancfb80c72015-10-19 16:50:04 -0600533 _LOG_DEBUG("Ack interest : " << interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600534
535
536 //TODO: if already in catalog, what do we do?
537 //ask for content
538 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
539 size_t m_nextSegment = 0;
540 std::shared_ptr<ndn::Interest> retrieveInterest =
541 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
542 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
543 retrieveInterest->setMustBeFresh(m_mustBeFresh);
544 m_face->expressInterest(*retrieveInterest,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600545 bind(&PublishAdapter<DatabaseHandler>::onPublishedData,
Chengyu Fan46398212015-08-11 11:23:13 -0600546 this,_1, _2),
Chengyu Fan71b712b2015-09-09 22:13:56 -0600547 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
548
549 _LOG_DEBUG("Expressing Interest " << retrieveInterest->toUri());
550 _LOG_DEBUG("<< PublishAdapter::onPublishInterest");
Chengyu Fan46398212015-08-11 11:23:13 -0600551}
552
553template <typename DatabaseHandler>
554void
555PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
556{
Chengyu Fancfb80c72015-10-19 16:50:04 -0600557 _LOG_ERROR(interest.getName() << "timed out");
Alison Craig2a4d5282015-04-10 12:00:02 -0600558}
559
560template <typename DatabaseHandler>
561void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600562PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
563 const std::string& failureInfo)
564{
Chengyu Fancfb80c72015-10-19 16:50:04 -0600565 _LOG_ERROR(data->getName() << " validation failed: " << failureInfo);
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600566}
567
568template <typename DatabaseHandler>
569void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600570PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
571 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600572{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600573 _LOG_DEBUG(">> PublishAdapter::onPublishedData");
Chengyu Fancfb80c72015-10-19 16:50:04 -0600574 _LOG_DEBUG("Recv data : " << data.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600575 if (data.getContent().empty()) {
576 return;
577 }
Chengyu Fan612f11c2015-09-23 16:24:47 -0600578 if (m_publishValidator != nullptr) {
579 m_publishValidator->validate(data,
580 bind(&PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod, this, _1),
581 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed, this, _1, _2));
582 }
583 else {
584 std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
585 validatePublishedDataPaylod(dataPtr);
586 }
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600587}
Chengyu Fan46398212015-08-11 11:23:13 -0600588
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600589template <typename DatabaseHandler>
590void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600591PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600592{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600593 _LOG_DEBUG(">> PublishAdapter::onValidatePublishedDataPayload");
594
Chengyu Fan46398212015-08-11 11:23:13 -0600595 // validate published data payload, if failed, return
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600596 if (!validatePublicationChanges(data)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600597 _LOG_ERROR("Data validation failed : " << data->getName());
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600598 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
599 data->getContent().value_size());
Chengyu Fan71b712b2015-09-09 22:13:56 -0600600 _LOG_DEBUG(payload);
Chengyu Fan46398212015-08-11 11:23:13 -0600601 return;
602 }
603
604 // todo: return value to indicate if the insertion succeeds
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600605 processUpdateData(data);
Chengyu Fan46398212015-08-11 11:23:13 -0600606
607 // ideally, data should not be stale?
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600608 m_socket->publishData(data->getContent(), ndn::time::seconds(3600));
Chengyu Fan46398212015-08-11 11:23:13 -0600609
610 // if this is not the final block, continue to fetch the next one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600611 const ndn::name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
612 if (finalBlockId == data->getName()[-1]) {
Chengyu Fan46398212015-08-11 11:23:13 -0600613 m_isFinished = true;
614 }
615 //else, get the next segment
616 if (!m_isFinished) {
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600617 ndn::Name nextInterestName = data->getName().getPrefix(-1);
618 uint64_t incomingSegment = data->getName()[-1].toSegment();
619 incomingSegment++;
Chengyu Fan71b712b2015-09-09 22:13:56 -0600620
621 _LOG_DEBUG("Next Interest Name " << nextInterestName << " Segment " << incomingSegment);
622
Chengyu Fan46398212015-08-11 11:23:13 -0600623 std::shared_ptr<ndn::Interest> nextInterest =
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600624 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment));
Chengyu Fan46398212015-08-11 11:23:13 -0600625 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
626 nextInterest->setMustBeFresh(m_mustBeFresh);
627 m_face->expressInterest(*nextInterest,
628 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
629 this,_1, _2),
630 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
631 this, _1));
632 }
633}
634
635template <typename DatabaseHandler>
636void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600637PublishAdapter<DatabaseHandler>::processUpdateData(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan46398212015-08-11 11:23:13 -0600638{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600639 _LOG_DEBUG(">> PublishAdapter::processUpdateData");
640
Chengyu Fan46398212015-08-11 11:23:13 -0600641 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
642 data->getContent().value_size());
643
644 if (payload.length() <= 0) {
645 return;
646 }
647
648 // the data payload must be JSON format
649 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
650 Json::Value parsedFromPayload;
651 Json::Reader jsonReader;
652 if (!jsonReader.parse(payload, parsedFromPayload)) {
653 // todo: logging events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600654 _LOG_DEBUG("Fail to parse the update data");
Chengyu Fan46398212015-08-11 11:23:13 -0600655 return;
656 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600657
Chengyu Fan46398212015-08-11 11:23:13 -0600658 std::stringstream ss;
659 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600660 // todo: before use, check if the connection is not NULL
661 // we may need to use lock here to ensure thread safe
662 operateDatabase(ss.str(), util::ADD);
663 }
664
665 ss.str("");
666 ss.clear();
667 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600668 operateDatabase(ss.str(), util::REMOVE);
669 }
670}
671
672template <typename DatabaseHandler>
673chronosync::SeqNo
674PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
675{
676 // empty
677 return 0;
678}
679
680template <>
681chronosync::SeqNo
Chengyu Fan31737f12016-01-12 21:08:50 -0700682PublishAdapter<ConnectionPool_T>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
Chengyu Fan46398212015-08-11 11:23:13 -0600683{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600684 _LOG_DEBUG(">> PublishAdapter::getLatestSeqNo");
685
Chengyu Fan31737f12016-01-12 21:08:50 -0700686 Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600687
Chengyu Fan31737f12016-01-12 21:08:50 -0700688 if (!conn) {
689 _LOG_DEBUG("No available database connections");
690 return 0;
Chengyu Fan46398212015-08-11 11:23:13 -0600691 }
Chengyu Fan46398212015-08-11 11:23:13 -0600692
Chengyu Fan31737f12016-01-12 21:08:50 -0700693 PreparedStatement_T ps4SeqNum =
694 Connection_prepareStatement(conn,
695 "SELECT seq_num FROM chronosync_update_info WHERE session_name = ?");
696 PreparedStatement_setString(ps4SeqNum, 1, update.session.toUri().c_str());
697 ResultSet_T res4SeqNum;
698 TRY {
699 res4SeqNum = PreparedStatement_executeQuery(ps4SeqNum);
Chengyu Fan46398212015-08-11 11:23:13 -0600700 }
Chengyu Fan31737f12016-01-12 21:08:50 -0700701 CATCH(SQLException) {
702 _LOG_ERROR(Connection_getLastError(conn));
703 }
704 END_TRY;
705
706 while (ResultSet_next(res4SeqNum)) {
707 return ResultSet_getInt(res4SeqNum, 1);
708 }
709
710 Connection_close(conn);
711
Chengyu Fan46398212015-08-11 11:23:13 -0600712 return 0;
713}
714
715template <typename DatabaseHandler>
716void
717PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
718{
719 //empty
720}
721
722template <>
723void
Chengyu Fan31737f12016-01-12 21:08:50 -0700724PublishAdapter<ConnectionPool_T>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
Chengyu Fan46398212015-08-11 11:23:13 -0600725{
Chengyu Fan31737f12016-01-12 21:08:50 -0700726 Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600727
Chengyu Fan31737f12016-01-12 21:08:50 -0700728 if (!conn) {
729 _LOG_DEBUG("No available database connections");
730 return;
731 }
732
733 PreparedStatement_T ps4UpdateSeqNum =
734 Connection_prepareStatement(conn,
735 "UPDATE chronosync_update_info SET seq_num = ? WHERE session_name = ?");
736 PreparedStatement_setLLong(ps4UpdateSeqNum, 1, update.high);
737 PreparedStatement_setString(ps4UpdateSeqNum, 1, update.session.toUri().c_str());
738
739 TRY {
740 PreparedStatement_execute(ps4UpdateSeqNum);
741 }
742 CATCH(SQLException) {
743 _LOG_ERROR(Connection_getLastError(conn));
744 }
745 END_TRY;
746
747 Connection_close(conn);
748
Chengyu Fan46398212015-08-11 11:23:13 -0600749}
750
751template <typename DatabaseHandler>
752void
753PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
754{
755 //empty
756}
757
758template <>
759void
Chengyu Fan31737f12016-01-12 21:08:50 -0700760PublishAdapter<ConnectionPool_T>::addUpdateInformation(const chronosync::MissingDataInfo& update)
Chengyu Fan46398212015-08-11 11:23:13 -0600761{
Chengyu Fan31737f12016-01-12 21:08:50 -0700762 Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
Chengyu Fan46398212015-08-11 11:23:13 -0600763
Chengyu Fan31737f12016-01-12 21:08:50 -0700764 if (!conn) {
765 _LOG_DEBUG("No available database connections");
766 return;
767 }
768
769 PreparedStatement_T ps4UpdateChronosync =
770 Connection_prepareStatement(conn, "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES (?, ?)");
771
772 PreparedStatement_setString(ps4UpdateChronosync, 1, update.session.toUri().c_str());
773 PreparedStatement_setLLong(ps4UpdateChronosync, 1, update.high);
774
775 TRY {
776 PreparedStatement_execute(ps4UpdateChronosync);
777 }
778 CATCH(SQLException) {
779 _LOG_ERROR(Connection_getLastError(conn));
780 }
781 END_TRY;
782
783 Connection_close(conn);
784
Chengyu Fan46398212015-08-11 11:23:13 -0600785}
786
787template <typename DatabaseHandler>
788void
789PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
790{
791 // todo: record event, and use recovery Interest to fetch the whole table
Chengyu Fancfb80c72015-10-19 16:50:04 -0600792 _LOG_ERROR("UpdateData retrieval timed out: " << interest.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600793}
794
795template <typename DatabaseHandler>
796void
Chengyu Fan46398212015-08-11 11:23:13 -0600797PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
798 updates)
799{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600800 _LOG_DEBUG(">> PublishAdapter::processSyncUpdate");
801
Chengyu Fan46398212015-08-11 11:23:13 -0600802 if (updates.empty()) {
803 return;
804 }
805
806 // multiple updates from different catalog are possible
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600807 for (size_t i = 0; i < updates.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600808 // check if the session is in local DB
809 // if yes, only fetch packets whose seq number is bigger than the one in the DB
810 // if no, directly fetch Data
811 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
812 bool update = false;
813
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600814 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
Chengyu Fan46398212015-08-11 11:23:13 -0600815 if (seq > localSeqNo) {
816 m_socket->fetchData(updates[i].session, seq,
817 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600818 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed,
Chengyu Fan46398212015-08-11 11:23:13 -0600819 this, _1, _2),
820 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
821 this, _1),
822 RETRY_WHEN_TIMEOUT);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600823
824 _LOG_DEBUG("Interest for [" << updates[i].session << ":" << seq << "]");
825
Chengyu Fan46398212015-08-11 11:23:13 -0600826 update = true;
827 }
828 }
829 // update the seq session name and seq number in local DB
830 // indicating they are processed. So latter when this node reboots again, won't redo it
831 if (update) {
832 if (localSeqNo > 0)
833 renewUpdateInformation(updates[i]);
834 else
835 addUpdateInformation(updates[i]);
836 }
837 }
838}
839
840template <typename DatabaseHandler>
841void
842PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
843{
844 // empty
845}
846
847template <>
848void
Chengyu Fan31737f12016-01-12 21:08:50 -0700849PublishAdapter<ConnectionPool_T>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
Chengyu Fan46398212015-08-11 11:23:13 -0600850{
Chengyu Fan31737f12016-01-12 21:08:50 -0700851 Connection_T conn = ConnectionPool_getConnection(*m_databaseHandler);
852
853 if (!conn) {
854 _LOG_DEBUG("No available database connections");
855 return;
856 }
857
858 TRY {
859 Connection_execute(conn, reinterpret_cast<const char*>(sql.c_str()), sql.size());
860 }
861 CATCH(SQLException) {
862 _LOG_ERROR(Connection_getLastError(conn));
863 }
864 END_TRY;
865
866 Connection_close(conn);
Chengyu Fan46398212015-08-11 11:23:13 -0600867}
868
869template<typename DatabaseHandler>
870bool
871PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
872 Json::Value& jsonValue,
873 util::DatabaseOperation op)
874{
875 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600876 return false;
877 }
878 if (op == util::ADD) {
879 size_t updateNumber = jsonValue["add"].size();
880 if (updateNumber <= 0)
881 return false;
882
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600883 sqlString << "INSERT INTO " << m_databaseTable << " (";
Chengyu Fancfb80c72015-10-19 16:50:04 -0600884 for (size_t i = 0; i < m_tableColumns.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600885 if (i != 0)
886 sqlString << ", ";
Chengyu Fancfb80c72015-10-19 16:50:04 -0600887 sqlString << m_tableColumns[i];
Chengyu Fan46398212015-08-11 11:23:13 -0600888 }
889 sqlString << ") VALUES";
890
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600891 for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
Chengyu Fan46398212015-08-11 11:23:13 -0600892 if (i > 0)
893 sqlString << ",";
894 // cast might be overflowed
895 Json::Value item = jsonValue["add"][static_cast<int>(i)];
896 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600897 _LOG_ERROR("Malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600898 return false;
899 }
900 std::string fileName(item.asString());
901 // use digest sha256 for now, may be removed
902 ndn::util::Digest<CryptoPP::SHA256> digest;
903 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
904
905 sqlString << "('" << digest.toString() << "','" << fileName << "'";
906
907 // parse the ndn name to get each value for each field
908 if (!name2Fields(sqlString, fileName))
909 return false;
910 sqlString << ")";
911 }
912 sqlString << ";";
913 }
914 else if (op == util::REMOVE) {
915 // remove files from db
916 size_t updateNumber = jsonValue["remove"].size();
917 if (updateNumber <= 0)
918 return false;
919
Chengyu Fan71b712b2015-09-09 22:13:56 -0600920 sqlString << "DELETE FROM " << m_databaseTable << " WHERE name IN (";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600921 for (size_t i = 0; i < updateNumber; ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600922 if (i > 0)
923 sqlString << ",";
924 // cast might be overflowed
925 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
926 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fancfb80c72015-10-19 16:50:04 -0600927 _LOG_ERROR("Malformed JsonQuery");
Chengyu Fan46398212015-08-11 11:23:13 -0600928 return false;
929 }
930 std::string fileName(item.asString());
931
932 sqlString << "'" << fileName << "'";
933 }
934 sqlString << ");";
935 }
936 return true;
937}
938
939template<typename DatabaseHandler>
940bool
941PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
942 std::string& fileName)
943{
944 size_t start = 0;
945 size_t pos = 0;
946 size_t count = 0;
947 std::string token;
948 std::string delimiter = "/";
949 // fileName must starts with either ndn:/ or /
950 std::string nameWithNdn("ndn:/");
951 std::string nameWithSlash("/");
952 if (fileName.find(nameWithNdn) == 0) {
953 start = nameWithNdn.size();
954 }
955 else if (fileName.find(nameWithSlash) == 0) {
956 start = nameWithSlash.size();
957 }
958 else
959 return false;
960
961 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600962 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600963 token = fileName.substr(start, pos - start);
Chengyu Fancfb80c72015-10-19 16:50:04 -0600964 // exclude the sha256 and name (already processed)
965 if (count >= m_tableColumns.size() - 2) {
966 return false;
Chengyu Fan46398212015-08-11 11:23:13 -0600967 }
968 sqlString << ",'" << token << "'";
969 start = pos + 1;
970 }
971
Chengyu Fancfb80c72015-10-19 16:50:04 -0600972 // sha256 and name have been processed, and the last token will be processed later
973 if (count != m_tableColumns.size() - 3 || std::string::npos == start)
Chengyu Fan46398212015-08-11 11:23:13 -0600974 return false;
975 token = fileName.substr(start, std::string::npos - start);
976 sqlString << ",'" << token << "'";
977 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600978}
979
980template<typename DatabaseHandler>
981bool
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600982PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
983 std::shared_ptr<const ndn::Data>& data)
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600984{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600985 _LOG_DEBUG(">> PublishAdapter::validatePublicationChanges");
986
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600987 // The data name must be "/<publisher-prefix>/<nonce>"
988 // the prefix is the data name removes the last component
989 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
990
991 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
992 data->getContent().value_size());
993 Json::Value parsedFromString;
994 Json::Reader reader;
995 if (!reader.parse(payload, parsedFromString)) {
996 // parse error, log events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600997 _LOG_DEBUG("Fail to parse the published Data" << data->getName());
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600998 return false;
999 }
1000
1001 // validate added files...
1002 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
1003 if (!publisherPrefix.isPrefixOf(
1004 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
1005 return false;
1006 }
1007
1008 // validate removed files ...
1009 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
1010 if (!publisherPrefix.isPrefixOf(
1011 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
1012 return false;
1013 }
1014 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -06001015}
1016
1017} // namespace publish
1018} // namespace atmos
1019#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP