blob: aa61845ceb88ecd6f6df7e6b3d75476c6f650a52 [file] [log] [blame]
Alison Craig2a4d5282015-04-10 12:00:02 -06001/** NDN-Atmos: Cataloging Service for distributed data originally developed
2 * for atmospheric science data
3 * Copyright (C) 2015 Colorado State University
4 *
5 * NDN-Atmos is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * NDN-Atmos is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with NDN-Atmos. If not, see <http://www.gnu.org/licenses/>.
17**/
18
19#ifndef ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
20#define ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP
21
22#include "util/catalog-adapter.hpp"
23#include "util/mysql-util.hpp"
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060024#include <mysql/mysql.h>
Alison Craig2a4d5282015-04-10 12:00:02 -060025
26#include <json/reader.h>
27#include <json/value.h>
28#include <json/writer.h>
29
30#include <ndn-cxx/face.hpp>
31#include <ndn-cxx/interest.hpp>
32#include <ndn-cxx/interest-filter.hpp>
33#include <ndn-cxx/name.hpp>
34#include <ndn-cxx/security/key-chain.hpp>
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060035#include <ndn-cxx/security/validator-config.hpp>
Chengyu Fanf4c747a2015-08-18 13:56:01 -060036#include <ndn-cxx/util/string-helper.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060037
Chengyu Fan46398212015-08-11 11:23:13 -060038#include <ChronoSync/socket.hpp>
Alison Craig2a4d5282015-04-10 12:00:02 -060039#include <memory>
40#include <string>
Chengyu Fanb25835b2015-04-28 17:09:35 -060041#include <vector>
42#include <unordered_map>
Chengyu Fan46398212015-08-11 11:23:13 -060043#include <mutex>
Alison Craig2a4d5282015-04-10 12:00:02 -060044
Chengyu Fan71b712b2015-09-09 22:13:56 -060045#include "util/logger.hpp"
46
Alison Craig2a4d5282015-04-10 12:00:02 -060047namespace atmos {
48namespace publish {
Chengyu Fan71b712b2015-09-09 22:13:56 -060049#ifdef HAVE_LOG4CXX
50 INIT_LOGGER("PublishAdapter");
51#endif
Chengyu Fanc7b87ad2015-07-09 16:44:37 -060052
Chengyu Fan46398212015-08-11 11:23:13 -060053#define RETRY_WHEN_TIMEOUT 2
54// TODO: need to use the configured nameFields
55std::array<std::string, 12> atmosTableColumns = {{"sha256", "name", "activity", "product",
56 "organization", "model", "experiment",
57 "frequency", "modeling_realm",
58 "variable_name", "ensemble", "time"}};
59
Alison Craig2a4d5282015-04-10 12:00:02 -060060/**
61 * PublishAdapter handles the Publish usecases for the catalog
62 */
63template <typename DatabaseHandler>
Chengyu Fanb25835b2015-04-28 17:09:35 -060064class PublishAdapter : public atmos::util::CatalogAdapter {
Alison Craig2a4d5282015-04-10 12:00:02 -060065public:
66 /**
67 * Constructor
68 *
Chengyu Fanb25835b2015-04-28 17:09:35 -060069 * @param face: Face that will be used for NDN communications
70 * @param keyChain: KeyChain that will be used for data signing
Alison Craig2a4d5282015-04-10 12:00:02 -060071 */
Chengyu Fanb25835b2015-04-28 17:09:35 -060072 PublishAdapter(const std::shared_ptr<ndn::Face>& face,
73 const std::shared_ptr<ndn::KeyChain>& keyChain);
Alison Craig2a4d5282015-04-10 12:00:02 -060074
Alison Craig2a4d5282015-04-10 12:00:02 -060075 virtual
76 ~PublishAdapter();
77
Chengyu Fanb25835b2015-04-28 17:09:35 -060078 /**
79 * Helper function that subscribe to a publish section for the config file
80 */
81 void
82 setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -060083 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -060084 const std::vector<std::string>& nameFields,
85 const std::string& databaseTable);
Chengyu Fanb25835b2015-04-28 17:09:35 -060086
Alison Craig2a4d5282015-04-10 12:00:02 -060087protected:
88 /**
Chengyu Fanb25835b2015-04-28 17:09:35 -060089 * Helper function that configures piblishAdapter instance according to publish section
90 * in config file
91 */
92 void
93 onConfig(const util::ConfigSection& section,
94 bool isDryDun,
95 const std::string& fileName,
96 const ndn::Name& prefix);
97
98 /**
Alison Craig2a4d5282015-04-10 12:00:02 -060099 * Initial "please publish this" Interests
100 *
101 * @param filter: InterestFilter that caused this Interest to be routed
102 * @param interest: Interest that needs to be handled
103 */
104 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600105 onPublishInterest(const ndn::InterestFilter& filter, const ndn::Interest& interest);
Alison Craig2a4d5282015-04-10 12:00:02 -0600106
Chengyu Fan46398212015-08-11 11:23:13 -0600107 virtual void
108 onTimeout(const ndn::Interest& interest);
109
Alison Craig2a4d5282015-04-10 12:00:02 -0600110 /**
111 * Data containing the actual thing we need to publish
112 *
113 * @param interest: Interest that caused this Data to be routed
114 * @param data: Data that needs to be handled
115 */
116 virtual void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600117 onPublishedData(const ndn::Interest& interest, const ndn::Data& data);
Alison Craig2a4d5282015-04-10 12:00:02 -0600118
Chengyu Fanb25835b2015-04-28 17:09:35 -0600119 /**
Chengyu Fan46398212015-08-11 11:23:13 -0600120 * Helper function to initialize the DatabaseHandler
Chengyu Fanb25835b2015-04-28 17:09:35 -0600121 */
122 void
Chengyu Fan46398212015-08-11 11:23:13 -0600123 initializeDatabase(const util::ConnectionDetails& databaseId);
Alison Craig2a4d5282015-04-10 12:00:02 -0600124
Chengyu Fanb25835b2015-04-28 17:09:35 -0600125 /**
126 * Helper function that sets filters to make the adapter work
127 */
128 void
129 setFilters();
130
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600131 void
132 setCatalogId();
133
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600134 /**
135 * Function to validate publication changes against the trust model, which is, all file
136 * names must be under the publisher's prefix. This function should be called by a callback
137 * function invoked by validator
138 *
139 * @param data: received data from the publisher
140 */
141 bool
142 validatePublicationChanges(const std::shared_ptr<const ndn::Data>& data);
143
Chengyu Fan46398212015-08-11 11:23:13 -0600144
145 /**
146 * Helper function that processes the sync update
147 *
148 * @param updates: vector that contains all the missing data information
149 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600150 void
151 processSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
Chengyu Fan46398212015-08-11 11:23:13 -0600152
153 /**
154 * Helper function that processes the update data
155 *
156 * @param data: shared pointer for the fetched update data
157 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600158 void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600159 processUpdateData(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600160
161 /**
162 * Helper function that add data to or remove data from database
163 *
164 * @param sql: sql string to do the add or remove jobs
165 * @param op: enum value indicates the database operation, could be REMOVE, ADD
166 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600167 virtual void
168 operateDatabase(const std::string& sql,
169 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600170
171 /**
172 * Helper function that parses jsonValue to generate sql string, return value indicates
173 * if it is successfully
174 *
175 * @param sqlString: streamstream to save the sql string
176 * @param jsonValue: Json value that contains the update information
177 * @param op: enum value indicates the database operation, could be REMOVE, ADD
178 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600179 bool
180 json2Sql(std::stringstream& sqlString,
181 Json::Value& jsonValue,
182 util::DatabaseOperation op);
Chengyu Fan46398212015-08-11 11:23:13 -0600183
184 /**
185 * Helper function to generate sql string based on file name, return value indicates
186 * if it is successfully
187 *
188 * @param sqlString: streamstream to save the sql string
189 * @param fileName: ndn uri string for a file name
190 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600191 bool
192 name2Fields(std::stringstream& sqlstring,
193 std::string& fileName);
Chengyu Fan46398212015-08-11 11:23:13 -0600194
195 /**
196 * Check the local database for the latest sequence number for a ChronoSync update
197 *
198 * @param update: the MissingDataInfo object
199 */
200 chronosync::SeqNo
201 getLatestSeqNo(const chronosync::MissingDataInfo& update);
202
203 /**
204 * Update the local database with the update message
205 *
206 * @param update: the MissingDataInfo object
207 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600208 void
209 renewUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600210
211 /**
212 * Insert the update message into the local database
213 *
214 * @param update: the MissingDataInfo object
215 */
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600216 void
217 addUpdateInformation(const chronosync::MissingDataInfo& update);
Chengyu Fan46398212015-08-11 11:23:13 -0600218
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600219 void
220 onFetchUpdateDataTimeout(const ndn::Interest& interest);
Chengyu Fan46398212015-08-11 11:23:13 -0600221
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600222 void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600223 onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
224 const std::string& failureInfo);
225
226 void
227 validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data);
Chengyu Fan46398212015-08-11 11:23:13 -0600228
Chengyu Fanb25835b2015-04-28 17:09:35 -0600229protected:
230 typedef std::unordered_map<ndn::Name, const ndn::RegisteredPrefixId*> RegisteredPrefixList;
231 // Prefix for ChronoSync
232 ndn::Name m_syncPrefix;
233 // Handle to the Catalog's database
234 std::shared_ptr<DatabaseHandler> m_databaseHandler;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600235 std::unique_ptr<ndn::ValidatorConfig> m_publishValidator;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600236 RegisteredPrefixList m_registeredPrefixList;
Chengyu Fan46398212015-08-11 11:23:13 -0600237 std::unique_ptr<chronosync::Socket> m_socket; // SyncSocket
238 // mutex to control critical sections
239 std::mutex m_mutex;
240 std::vector<std::string> m_tableColumns;
241 // TODO: create thread for each request, and the variables below should be within the thread
242 bool m_mustBeFresh;
243 bool m_isFinished;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600244 ndn::Name m_catalogId;
Alison Craig2a4d5282015-04-10 12:00:02 -0600245};
246
Alison Craig2a4d5282015-04-10 12:00:02 -0600247
Chengyu Fanb25835b2015-04-28 17:09:35 -0600248template <typename DatabaseHandler>
249PublishAdapter<DatabaseHandler>::PublishAdapter(const std::shared_ptr<ndn::Face>& face,
250 const std::shared_ptr<ndn::KeyChain>& keyChain)
251 : util::CatalogAdapter(face, keyChain)
Chengyu Fan46398212015-08-11 11:23:13 -0600252 , m_mustBeFresh(true)
253 , m_isFinished(false)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600254 , m_catalogId("catalogIdPlaceHolder")
Chengyu Fanb25835b2015-04-28 17:09:35 -0600255{
256}
257
258template <typename DatabaseHandler>
259void
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600260PublishAdapter<DatabaseHandler>::setCatalogId()
261{
262 // empty
263}
264
265template <>
266void
267PublishAdapter<MYSQL>::setCatalogId()
268{
269 // use public key digest as the catalog ID
270 ndn::Name keyId;
271 if (m_signingId.empty()) {
272 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_keyChain->getDefaultIdentity());
273 } else {
274 keyId = m_keyChain->getDefaultKeyNameForIdentity(m_signingId);
275 }
276
277 std::shared_ptr<ndn::PublicKey> pKey = m_keyChain->getPib().getPublicKey(keyId);
278 ndn::Block keyDigest = pKey->computeDigest();
279 m_catalogId.clear();
280 m_catalogId.append(ndn::toHex(*keyDigest.getBuffer()));
281}
282
283template <typename DatabaseHandler>
284void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600285PublishAdapter<DatabaseHandler>::setFilters()
286{
287 ndn::Name publishPrefix = ndn::Name(m_prefix).append("publish");
Chengyu Fan46398212015-08-11 11:23:13 -0600288 m_registeredPrefixList[publishPrefix] =
289 m_face->setInterestFilter(publishPrefix,
290 bind(&PublishAdapter<DatabaseHandler>::onPublishInterest,
291 this, _1, _2),
292 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterSuccess,
293 this, _1),
294 bind(&publish::PublishAdapter<DatabaseHandler>::onRegisterFailure,
295 this, _1, _2));
296
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600297 ndn::Name catalogSync = ndn::Name(m_prefix).append("sync").append(m_catalogId);
Chengyu Fan46398212015-08-11 11:23:13 -0600298 m_socket.reset(new chronosync::Socket(m_syncPrefix,
299 catalogSync,
300 *m_face,
301 bind(&PublishAdapter<DatabaseHandler>::processSyncUpdate,
302 this, _1)));
Alison Craig2a4d5282015-04-10 12:00:02 -0600303}
304
305template <typename DatabaseHandler>
306PublishAdapter<DatabaseHandler>::~PublishAdapter()
307{
Chengyu Fanb25835b2015-04-28 17:09:35 -0600308 for (const auto& itr : m_registeredPrefixList) {
309 if (static_cast<bool>(itr.second))
310 m_face->unsetInterestFilter(itr.second);
311 }
Alison Craig2a4d5282015-04-10 12:00:02 -0600312}
313
314template <typename DatabaseHandler>
315void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600316PublishAdapter<DatabaseHandler>::setConfigFile(util::ConfigFile& config,
Chengyu Fan92440162015-07-09 14:43:31 -0600317 const ndn::Name& prefix,
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600318 const std::vector<std::string>& nameFields,
319 const std::string& databaseTable)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600320{
Chengyu Fan92440162015-07-09 14:43:31 -0600321 m_nameFields = nameFields;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600322 m_databaseTable = databaseTable;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600323 config.addSectionHandler("publishAdapter",
324 bind(&PublishAdapter<DatabaseHandler>::onConfig, this,
325 _1, _2, _3, prefix));
326}
327
328template <typename DatabaseHandler>
329void
330PublishAdapter<DatabaseHandler>::onConfig(const util::ConfigSection& section,
331 bool isDryRun,
332 const std::string& filename,
333 const ndn::Name& prefix)
334{
335 using namespace util;
336 if (isDryRun) {
337 return;
338 }
339
340 std::string signingId, dbServer, dbName, dbUser, dbPasswd;
341 std::string syncPrefix("ndn:/ndn-atmos/broadcast/chronosync");
342
343 for (auto item = section.begin();
344 item != section.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600345 ++item)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600346 {
347 if (item->first == "signingId") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600348 signingId = item->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600349 if (signingId.empty()) {
350 throw Error("Invalid value for \"signingId\""
351 " in \"publish\" section");
352 }
353 }
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600354 else if (item->first == "security") {
355 // when use, the validator must specify the callback func to handle the validated data
356 // it should be called when the Data packet that contains the published file names is received
357 m_publishValidator.reset(new ndn::ValidatorConfig(m_face.get()));
358 m_publishValidator->load(item->second, filename);
359 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600360 else if (item->first == "database") {
361 const util::ConfigSection& databaseSection = item->second;
362 for (auto subItem = databaseSection.begin();
363 subItem != databaseSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600364 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600365 if (subItem->first == "dbServer") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600366 dbServer = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600367 if (dbServer.empty()){
368 throw Error("Invalid value for \"dbServer\""
369 " in \"publish\" section");
370 }
371 }
372 if (subItem->first == "dbName") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600373 dbName = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600374 if (dbName.empty()){
375 throw Error("Invalid value for \"dbName\""
376 " in \"publish\" section");
377 }
378 }
379 if (subItem->first == "dbUser") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600380 dbUser = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600381 if (dbUser.empty()){
382 throw Error("Invalid value for \"dbUser\""
383 " in \"publish\" section");
384 }
385 }
386 if (subItem->first == "dbPasswd") {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600387 dbPasswd = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600388 if (dbPasswd.empty()){
389 throw Error("Invalid value for \"dbPasswd\""
390 " in \"publish\" section");
391 }
392 }
393 }
394 }
395 else if (item->first == "sync") {
396 const util::ConfigSection& synSection = item->second;
397 for (auto subItem = synSection.begin();
398 subItem != synSection.end();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600399 ++subItem) {
Chengyu Fanb25835b2015-04-28 17:09:35 -0600400 if (subItem->first == "prefix") {
401 syncPrefix.clear();
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600402 syncPrefix = subItem->second.get_value<std::string>();
Chengyu Fanb25835b2015-04-28 17:09:35 -0600403 if (syncPrefix.empty()){
404 throw Error("Invalid value for \"prefix\""
405 " in \"publish\\sync\" section");
406 }
407 }
408 // todo: parse the sync_security section
409 }
410 }
411 }
412
413 m_prefix = prefix;
414 m_signingId = ndn::Name(signingId);
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600415 setCatalogId();
416
Chengyu Fan71b712b2015-09-09 22:13:56 -0600417 m_syncPrefix = syncPrefix;
Chengyu Fanb25835b2015-04-28 17:09:35 -0600418 util::ConnectionDetails mysqlId(dbServer, dbUser, dbPasswd, dbName);
419
Chengyu Fan46398212015-08-11 11:23:13 -0600420 initializeDatabase(mysqlId);
Chengyu Fanb25835b2015-04-28 17:09:35 -0600421 setFilters();
422}
423
424template <typename DatabaseHandler>
425void
Chengyu Fan46398212015-08-11 11:23:13 -0600426PublishAdapter<DatabaseHandler>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600427{
428 //empty
429}
430
431template <>
432void
Chengyu Fan46398212015-08-11 11:23:13 -0600433PublishAdapter<MYSQL>::initializeDatabase(const util::ConnectionDetails& databaseId)
Chengyu Fanb25835b2015-04-28 17:09:35 -0600434{
435 std::shared_ptr<MYSQL> conn = atmos::util::MySQLConnectionSetup(databaseId);
436
437 m_databaseHandler = conn;
Chengyu Fan46398212015-08-11 11:23:13 -0600438
439 if (m_databaseHandler != nullptr) {
440 std::string errMsg;
441 bool success = false;
442 // Ignore errors (when database already exists, errors are expected)
443 std::string createSyncTable =
444 "CREATE TABLE `chronosync_update_info` (\
445 `id` int(11) NOT NULL AUTO_INCREMENT, \
446 `session_name` varchar(1000) NOT NULL, \
447 `seq_num` int(11) NOT NULL, \
448 PRIMARY KEY (`id`), \
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600449 UNIQUE KEY `id_UNIQUE` (`id`) \
Chengyu Fan46398212015-08-11 11:23:13 -0600450 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
451
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600452 MySQLPerformQuery(m_databaseHandler, createSyncTable, util::CREATE,
453 success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600454 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600455 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600456
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600457 // create SQL string for table creation, id, sha256, and name are columns that we need
458 std::stringstream ss;
459 ss << "CREATE TABLE `" << m_databaseTable << "` (\
460 `id` int(100) NOT NULL AUTO_INCREMENT, \
461 `sha256` varchar(64) NOT NULL, \
462 `name` varchar(1000) NOT NULL,";
463 for (size_t i = 0; i < m_nameFields.size(); i++) {
464 ss << "`" << m_nameFields[i] << "` varchar(100) NOT NULL, ";
465 }
466 ss << "PRIMARY KEY (`id`), UNIQUE KEY `sha256` (`sha256`)\
Chengyu Fan46398212015-08-11 11:23:13 -0600467 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;";
468
469 success = false;
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600470 MySQLPerformQuery(m_databaseHandler, ss.str(), util::CREATE, success, errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600471 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600472 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600473 }
474 else {
475 throw Error("cannot connect to the Database");
476 }
Chengyu Fanb25835b2015-04-28 17:09:35 -0600477}
478
479template <typename DatabaseHandler>
480void
481PublishAdapter<DatabaseHandler>::onPublishInterest(const ndn::InterestFilter& filter,
482 const ndn::Interest& interest)
Alison Craig2a4d5282015-04-10 12:00:02 -0600483{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600484 _LOG_DEBUG(">> PublishAdapter::onPublishInterest");
485
Chengyu Fan46398212015-08-11 11:23:13 -0600486 // Example Interest : /cmip5/publish/<uri>/<nonce>
Chengyu Fan71b712b2015-09-09 22:13:56 -0600487 _LOG_DEBUG(interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600488
489 //send back ACK
Chengyu Fan71b712b2015-09-09 22:13:56 -0600490 char buf[] = "ACK";
Chengyu Fan46398212015-08-11 11:23:13 -0600491 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(interest.getName());
492 data->setFreshnessPeriod(ndn::time::milliseconds(10)); // 10 msec
493 data->setContent(reinterpret_cast<const uint8_t*>(buf), strlen(buf));
494 m_keyChain->sign(*data);
495 m_face->put(*data);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600496
497 _LOG_DEBUG("ACK interest : " << interest.getName().toUri());
Chengyu Fan46398212015-08-11 11:23:13 -0600498
499
500 //TODO: if already in catalog, what do we do?
501 //ask for content
502 ndn::Name interestStr = interest.getName().getSubName(m_prefix.size()+1);
503 size_t m_nextSegment = 0;
504 std::shared_ptr<ndn::Interest> retrieveInterest =
505 std::make_shared<ndn::Interest>(interestStr.appendSegment(m_nextSegment));
506 retrieveInterest->setInterestLifetime(ndn::time::milliseconds(4000));
507 retrieveInterest->setMustBeFresh(m_mustBeFresh);
508 m_face->expressInterest(*retrieveInterest,
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600509 bind(&PublishAdapter<DatabaseHandler>::onPublishedData,
Chengyu Fan46398212015-08-11 11:23:13 -0600510 this,_1, _2),
Chengyu Fan71b712b2015-09-09 22:13:56 -0600511 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout, this, _1));
512
513 _LOG_DEBUG("Expressing Interest " << retrieveInterest->toUri());
514 _LOG_DEBUG("<< PublishAdapter::onPublishInterest");
Chengyu Fan46398212015-08-11 11:23:13 -0600515}
516
517template <typename DatabaseHandler>
518void
519PublishAdapter<DatabaseHandler>::onTimeout(const ndn::Interest& interest)
520{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600521 _LOG_DEBUG(interest.getName() << "timed out");
Alison Craig2a4d5282015-04-10 12:00:02 -0600522}
523
524template <typename DatabaseHandler>
525void
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600526PublishAdapter<DatabaseHandler>::onValidationFailed(const std::shared_ptr<const ndn::Data>& data,
527 const std::string& failureInfo)
528{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600529 _LOG_DEBUG(data->getName() << " validation failed: " << failureInfo);
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600530}
531
532template <typename DatabaseHandler>
533void
Chengyu Fanb25835b2015-04-28 17:09:35 -0600534PublishAdapter<DatabaseHandler>::onPublishedData(const ndn::Interest& interest,
535 const ndn::Data& data)
Alison Craig2a4d5282015-04-10 12:00:02 -0600536{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600537 _LOG_DEBUG(">> PublishAdapter::onPublishedData");
538 _LOG_DEBUG("Data name: " << data.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600539 if (data.getContent().empty()) {
540 return;
541 }
Chengyu Fan612f11c2015-09-23 16:24:47 -0600542 if (m_publishValidator != nullptr) {
543 m_publishValidator->validate(data,
544 bind(&PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod, this, _1),
545 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed, this, _1, _2));
546 }
547 else {
548 std::shared_ptr<ndn::Data> dataPtr = std::make_shared<ndn::Data>(data);
549 validatePublishedDataPaylod(dataPtr);
550 }
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600551}
Chengyu Fan46398212015-08-11 11:23:13 -0600552
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600553template <typename DatabaseHandler>
554void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600555PublishAdapter<DatabaseHandler>::validatePublishedDataPaylod(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600556{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600557 _LOG_DEBUG(">> PublishAdapter::onValidatePublishedDataPayload");
558
Chengyu Fan46398212015-08-11 11:23:13 -0600559 // validate published data payload, if failed, return
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600560 if (!validatePublicationChanges(data)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600561 _LOG_DEBUG("Data validation failed : " << data->getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600562#ifndef NDEBUG
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600563 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
564 data->getContent().value_size());
Chengyu Fan71b712b2015-09-09 22:13:56 -0600565 _LOG_DEBUG(payload);
Chengyu Fan46398212015-08-11 11:23:13 -0600566#endif
567 return;
568 }
569
570 // todo: return value to indicate if the insertion succeeds
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600571 processUpdateData(data);
Chengyu Fan46398212015-08-11 11:23:13 -0600572
573 // ideally, data should not be stale?
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600574 m_socket->publishData(data->getContent(), ndn::time::seconds(3600));
Chengyu Fan46398212015-08-11 11:23:13 -0600575
576 // if this is not the final block, continue to fetch the next one
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600577 const ndn::name::Component& finalBlockId = data->getMetaInfo().getFinalBlockId();
578 if (finalBlockId == data->getName()[-1]) {
Chengyu Fan46398212015-08-11 11:23:13 -0600579 m_isFinished = true;
580 }
581 //else, get the next segment
582 if (!m_isFinished) {
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600583 ndn::Name nextInterestName = data->getName().getPrefix(-1);
584 uint64_t incomingSegment = data->getName()[-1].toSegment();
585 incomingSegment++;
Chengyu Fan71b712b2015-09-09 22:13:56 -0600586
587 _LOG_DEBUG("Next Interest Name " << nextInterestName << " Segment " << incomingSegment);
588
Chengyu Fan46398212015-08-11 11:23:13 -0600589 std::shared_ptr<ndn::Interest> nextInterest =
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600590 std::make_shared<ndn::Interest>(nextInterestName.appendSegment(incomingSegment));
Chengyu Fan46398212015-08-11 11:23:13 -0600591 nextInterest->setInterestLifetime(ndn::time::milliseconds(4000));
592 nextInterest->setMustBeFresh(m_mustBeFresh);
593 m_face->expressInterest(*nextInterest,
594 bind(&publish::PublishAdapter<DatabaseHandler>::onPublishedData,
595 this,_1, _2),
596 bind(&publish::PublishAdapter<DatabaseHandler>::onTimeout,
597 this, _1));
598 }
599}
600
601template <typename DatabaseHandler>
602void
Chengyu Fan612f11c2015-09-23 16:24:47 -0600603PublishAdapter<DatabaseHandler>::processUpdateData(const std::shared_ptr<const ndn::Data>& data)
Chengyu Fan46398212015-08-11 11:23:13 -0600604{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600605 _LOG_DEBUG(">> PublishAdapter::processUpdateData");
606
Chengyu Fan46398212015-08-11 11:23:13 -0600607 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
608 data->getContent().value_size());
609
610 if (payload.length() <= 0) {
611 return;
612 }
613
614 // the data payload must be JSON format
615 // http://redmine.named-data.net/projects/ndn-atmos/wiki/Sync
616 Json::Value parsedFromPayload;
617 Json::Reader jsonReader;
618 if (!jsonReader.parse(payload, parsedFromPayload)) {
619 // todo: logging events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600620 _LOG_DEBUG("Fail to parse the update data");
Chengyu Fan46398212015-08-11 11:23:13 -0600621 return;
622 }
Chengyu Fan71b712b2015-09-09 22:13:56 -0600623
Chengyu Fan46398212015-08-11 11:23:13 -0600624 std::stringstream ss;
625 if (json2Sql(ss, parsedFromPayload, util::ADD)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600626 // todo: before use, check if the connection is not NULL
627 // we may need to use lock here to ensure thread safe
628 operateDatabase(ss.str(), util::ADD);
629 }
630
631 ss.str("");
632 ss.clear();
633 if (json2Sql(ss, parsedFromPayload, util::REMOVE)) {
Chengyu Fan46398212015-08-11 11:23:13 -0600634 operateDatabase(ss.str(), util::REMOVE);
635 }
636}
637
638template <typename DatabaseHandler>
639chronosync::SeqNo
640PublishAdapter<DatabaseHandler>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
641{
642 // empty
643 return 0;
644}
645
646template <>
647chronosync::SeqNo
648PublishAdapter<MYSQL>::getLatestSeqNo(const chronosync::MissingDataInfo& update)
649{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600650 _LOG_DEBUG(">> PublishAdapter::getLatestSeqNo");
651
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600652 std::string sql = "SELECT seq_num FROM chronosync_update_info WHERE session_name = '"
Chengyu Fan46398212015-08-11 11:23:13 -0600653 + update.session.toUri() + "';";
Chengyu Fan71b712b2015-09-09 22:13:56 -0600654
Chengyu Fan46398212015-08-11 11:23:13 -0600655 std::string errMsg;
656 bool success;
657 std::shared_ptr<MYSQL_RES> results
658 = atmos::util::MySQLPerformQuery(m_databaseHandler, sql, util::QUERY, success, errMsg);
659 if (!success) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600660 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600661 return 0; //database connection error?
662 }
663 else if (results != nullptr){
664 MYSQL_ROW row;
665 if (mysql_num_rows(results.get()) == 0)
666 return 0;
667
668 while ((row = mysql_fetch_row(results.get())))
669 {
670 chronosync::SeqNo seqNo = std::stoull(row[0]);
671 return seqNo;
672 }
673 }
674 return 0;
675}
676
677template <typename DatabaseHandler>
678void
679PublishAdapter<DatabaseHandler>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
680{
681 //empty
682}
683
684template <>
685void
686PublishAdapter<MYSQL>::renewUpdateInformation(const chronosync::MissingDataInfo& update)
687{
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600688 std::string sql = "UPDATE chronosync_update_info SET seq_num = "
Chengyu Fan46398212015-08-11 11:23:13 -0600689 + boost::lexical_cast<std::string>(update.high)
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600690 + " WHERE session_name = '" + update.session.toUri() + "';";
Chengyu Fan71b712b2015-09-09 22:13:56 -0600691
Chengyu Fan46398212015-08-11 11:23:13 -0600692 std::string errMsg;
693 bool success = false;
694 m_mutex.lock();
695 util::MySQLPerformQuery(m_databaseHandler, sql, util::UPDATE, success, errMsg);
696 m_mutex.unlock();
697 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600698 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600699}
700
701template <typename DatabaseHandler>
702void
703PublishAdapter<DatabaseHandler>::addUpdateInformation(const chronosync::MissingDataInfo& update)
704{
705 //empty
706}
707
708template <>
709void
710PublishAdapter<MYSQL>::addUpdateInformation(const chronosync::MissingDataInfo& update)
711{
712 std::string sql = "INSERT INTO chronosync_update_info (session_name, seq_num) VALUES ('"
713 + update.session.toUri() + "', " + boost::lexical_cast<std::string>(update.high)
714 + ");";
715
Chengyu Fan46398212015-08-11 11:23:13 -0600716 std::string errMsg;
717 bool success = false;
718 m_mutex.lock();
719 util::MySQLPerformQuery(m_databaseHandler, sql, util::ADD, success, errMsg);
720 m_mutex.unlock();
721 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600722 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600723}
724
725template <typename DatabaseHandler>
726void
727PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout(const ndn::Interest& interest)
728{
729 // todo: record event, and use recovery Interest to fetch the whole table
Chengyu Fan71b712b2015-09-09 22:13:56 -0600730 _LOG_DEBUG("UpdateData retrieval timed out: " << interest.getName());
Chengyu Fan46398212015-08-11 11:23:13 -0600731}
732
733template <typename DatabaseHandler>
734void
Chengyu Fan46398212015-08-11 11:23:13 -0600735PublishAdapter<DatabaseHandler>::processSyncUpdate(const std::vector<chronosync::MissingDataInfo>&
736 updates)
737{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600738 _LOG_DEBUG(">> PublishAdapter::processSyncUpdate");
739
Chengyu Fan46398212015-08-11 11:23:13 -0600740 if (updates.empty()) {
741 return;
742 }
743
744 // multiple updates from different catalog are possible
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600745 for (size_t i = 0; i < updates.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600746 // check if the session is in local DB
747 // if yes, only fetch packets whose seq number is bigger than the one in the DB
748 // if no, directly fetch Data
749 chronosync::SeqNo localSeqNo = getLatestSeqNo(updates[i]);
750 bool update = false;
751
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600752 for (chronosync::SeqNo seq = updates[i].low; seq <= updates[i].high; ++seq) {
Chengyu Fan46398212015-08-11 11:23:13 -0600753 if (seq > localSeqNo) {
754 m_socket->fetchData(updates[i].session, seq,
755 bind(&PublishAdapter<DatabaseHandler>::processUpdateData,this, _1),
Chengyu Fan3b9bb342015-09-21 10:53:37 -0600756 bind(&PublishAdapter<DatabaseHandler>::onValidationFailed,
Chengyu Fan46398212015-08-11 11:23:13 -0600757 this, _1, _2),
758 bind(&PublishAdapter<DatabaseHandler>::onFetchUpdateDataTimeout,
759 this, _1),
760 RETRY_WHEN_TIMEOUT);
Chengyu Fan71b712b2015-09-09 22:13:56 -0600761
762 _LOG_DEBUG("Interest for [" << updates[i].session << ":" << seq << "]");
763
Chengyu Fan46398212015-08-11 11:23:13 -0600764 update = true;
765 }
766 }
767 // update the seq session name and seq number in local DB
768 // indicating they are processed. So latter when this node reboots again, won't redo it
769 if (update) {
770 if (localSeqNo > 0)
771 renewUpdateInformation(updates[i]);
772 else
773 addUpdateInformation(updates[i]);
774 }
775 }
776}
777
778template <typename DatabaseHandler>
779void
780PublishAdapter<DatabaseHandler>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
781{
782 // empty
783}
784
785template <>
786void
787PublishAdapter<MYSQL>::operateDatabase(const std::string& sql, util::DatabaseOperation op)
788{
789 std::string errMsg;
790 bool success = false;
791 m_mutex.lock();
792 atmos::util::MySQLPerformQuery(m_databaseHandler, sql, op, success, errMsg);
793 m_mutex.unlock();
794 if (!success)
Chengyu Fan71b712b2015-09-09 22:13:56 -0600795 _LOG_DEBUG(errMsg);
Chengyu Fan46398212015-08-11 11:23:13 -0600796}
797
798template<typename DatabaseHandler>
799bool
800PublishAdapter<DatabaseHandler>::json2Sql(std::stringstream& sqlString,
801 Json::Value& jsonValue,
802 util::DatabaseOperation op)
803{
804 if (jsonValue.type() != Json::objectValue) {
Chengyu Fan46398212015-08-11 11:23:13 -0600805 return false;
806 }
807 if (op == util::ADD) {
808 size_t updateNumber = jsonValue["add"].size();
809 if (updateNumber <= 0)
810 return false;
811
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600812 sqlString << "INSERT INTO " << m_databaseTable << " (";
813 for (size_t i = 0; i < atmosTableColumns.size(); ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600814 if (i != 0)
815 sqlString << ", ";
816 sqlString << atmosTableColumns[i];
817 }
818 sqlString << ") VALUES";
819
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600820 for (size_t i = 0; i < updateNumber; ++i) { //parse each file name
Chengyu Fan46398212015-08-11 11:23:13 -0600821 if (i > 0)
822 sqlString << ",";
823 // cast might be overflowed
824 Json::Value item = jsonValue["add"][static_cast<int>(i)];
825 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600826 _LOG_DEBUG("malformed JsonQuery string");
Chengyu Fan46398212015-08-11 11:23:13 -0600827 return false;
828 }
829 std::string fileName(item.asString());
830 // use digest sha256 for now, may be removed
831 ndn::util::Digest<CryptoPP::SHA256> digest;
832 digest.update(reinterpret_cast<const uint8_t*>(fileName.data()), fileName.length());
833
834 sqlString << "('" << digest.toString() << "','" << fileName << "'";
835
836 // parse the ndn name to get each value for each field
837 if (!name2Fields(sqlString, fileName))
838 return false;
839 sqlString << ")";
840 }
841 sqlString << ";";
842 }
843 else if (op == util::REMOVE) {
844 // remove files from db
845 size_t updateNumber = jsonValue["remove"].size();
846 if (updateNumber <= 0)
847 return false;
848
Chengyu Fan71b712b2015-09-09 22:13:56 -0600849 sqlString << "DELETE FROM " << m_databaseTable << " WHERE name IN (";
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600850 for (size_t i = 0; i < updateNumber; ++i) {
Chengyu Fan46398212015-08-11 11:23:13 -0600851 if (i > 0)
852 sqlString << ",";
853 // cast might be overflowed
854 Json::Value item = jsonValue["remove"][static_cast<int>(i)];
855 if (!item.isConvertibleTo(Json::stringValue)) {
Chengyu Fan71b712b2015-09-09 22:13:56 -0600856 _LOG_DEBUG("Malformed JsonQuery");
Chengyu Fan46398212015-08-11 11:23:13 -0600857 return false;
858 }
859 std::string fileName(item.asString());
860
861 sqlString << "'" << fileName << "'";
862 }
863 sqlString << ");";
864 }
865 return true;
866}
867
868template<typename DatabaseHandler>
869bool
870PublishAdapter<DatabaseHandler>::name2Fields(std::stringstream& sqlString,
871 std::string& fileName)
872{
873 size_t start = 0;
874 size_t pos = 0;
875 size_t count = 0;
876 std::string token;
877 std::string delimiter = "/";
878 // fileName must starts with either ndn:/ or /
879 std::string nameWithNdn("ndn:/");
880 std::string nameWithSlash("/");
881 if (fileName.find(nameWithNdn) == 0) {
882 start = nameWithNdn.size();
883 }
884 else if (fileName.find(nameWithSlash) == 0) {
885 start = nameWithSlash.size();
886 }
887 else
888 return false;
889
890 while ((pos = fileName.find(delimiter, start)) != std::string::npos) {
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600891 count++;
Chengyu Fan46398212015-08-11 11:23:13 -0600892 token = fileName.substr(start, pos - start);
893 if (count >= atmosTableColumns.size() - 2) { // exclude the sha256 and name
894 return false; //fileName contains more than 10 fields
895 }
896 sqlString << ",'" << token << "'";
897 start = pos + 1;
898 }
899
900 // must be 10 fields in total (add the tail one)
901 if (count != atmosTableColumns.size() - 3 || std::string::npos == start)
902 return false;
903 token = fileName.substr(start, std::string::npos - start);
904 sqlString << ",'" << token << "'";
905 return true;
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600906}
907
908template<typename DatabaseHandler>
909bool
Chengyu Fanf4c747a2015-08-18 13:56:01 -0600910PublishAdapter<DatabaseHandler>::validatePublicationChanges(const
911 std::shared_ptr<const ndn::Data>& data)
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600912{
Chengyu Fan71b712b2015-09-09 22:13:56 -0600913 _LOG_DEBUG(">> PublishAdapter::validatePublicationChanges");
914
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600915 // The data name must be "/<publisher-prefix>/<nonce>"
916 // the prefix is the data name removes the last component
917 ndn::Name publisherPrefix = data->getName().getPrefix(-1);
918
919 const std::string payload(reinterpret_cast<const char*>(data->getContent().value()),
920 data->getContent().value_size());
921 Json::Value parsedFromString;
922 Json::Reader reader;
923 if (!reader.parse(payload, parsedFromString)) {
924 // parse error, log events
Chengyu Fan71b712b2015-09-09 22:13:56 -0600925 _LOG_DEBUG("Fail to parse the published Data" << data->getName());
Chengyu Fanc7b87ad2015-07-09 16:44:37 -0600926 return false;
927 }
928
929 // validate added files...
930 for (size_t i = 0; i < parsedFromString["add"].size(); i++) {
931 if (!publisherPrefix.isPrefixOf(
932 ndn::Name(parsedFromString["add"][static_cast<int>(i)].asString())))
933 return false;
934 }
935
936 // validate removed files ...
937 for (size_t i = 0; i < parsedFromString["remove"].size(); i++) {
938 if (!publisherPrefix.isPrefixOf(
939 ndn::Name(parsedFromString["remove"][static_cast<int>(i)].asString())))
940 return false;
941 }
942 return true;
Alison Craig2a4d5282015-04-10 12:00:02 -0600943}
944
945} // namespace publish
946} // namespace atmos
947#endif //ATMOS_PUBLISH_PUBLISH_ADAPTER_HPP