blob: b63bc83b23caee41afd6ce1b098ba321d6f2c190 [file] [log] [blame]
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -07003 * Copyright (c) 2014-2020, The University of Memphis
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -05004 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
Ashlesh Gawande0cf4b602019-01-18 15:58:17 -06009 * of the GNU Lesser General Public License as published by the Free Software Foundation,
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050010 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
Ashlesh Gawande0cf4b602019-01-18 15:58:17 -060014 * PURPOSE. See the GNU Lesser General Public License for more details.
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050015 *
Ashlesh Gawande0cf4b602019-01-18 15:58:17 -060016 * You should have received a copy of the GNU Lesser General Public License along with
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050017 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -070018 */
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050019
20#include <PSync/consumer.hpp>
21
22#include <ndn-cxx/name.hpp>
23#include <ndn-cxx/util/logger.hpp>
24#include <ndn-cxx/util/random.hpp>
25
26#include <iostream>
27
28NDN_LOG_INIT(examples.PartialSyncConsumerApp);
29
30class PSyncConsumer
31{
32public:
33 /**
34 * @brief Initialize consumer and start hello process
35 *
36 * 0.001 is the false positive probability of the bloom filter
37 *
38 * @param syncPrefix should be the same as producer
39 * @param nSub number of subscriptions is used for the bloom filter (subscription list) size
40 */
41 PSyncConsumer(const ndn::Name& syncPrefix, int nSub)
42 : m_nSub(nSub)
43 , m_consumer(syncPrefix, m_face,
44 std::bind(&PSyncConsumer::afterReceiveHelloData, this, _1),
45 std::bind(&PSyncConsumer::processSyncUpdate, this, _1),
46 m_nSub, 0.001)
47 , m_rng(ndn::random::getRandomNumberEngine())
48 {
49 // This starts the consumer side by sending a hello interest to the producer
50 // When the producer responds with hello data, afterReceiveHelloData is called
51 m_consumer.sendHelloInterest();
52 }
53
54 void
55 run()
56 {
57 m_face.processEvents();
58 }
59
60private:
61 void
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -070062 afterReceiveHelloData(const std::map<ndn::Name, uint64_t>& availSubs)
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050063 {
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -070064 std::vector<ndn::Name> sensors;
65 sensors.reserve(availSubs.size());
66 for (const auto& it : availSubs) {
67 sensors.insert(sensors.end(), it.first);
68 }
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050069
70 std::shuffle(sensors.begin(), sensors.end(), m_rng);
71
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -070072 // Randomly subscribe to m_nSub prefixes
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050073 for (int i = 0; i < m_nSub; i++) {
Ashlesh Gawandecbdc0122020-07-13 21:13:00 -070074 ndn::Name prefix = sensors[i];
75 NDN_LOG_INFO("Subscribing to: " << prefix);
76 auto it = availSubs.find(prefix);
77 m_consumer.addSubscription(prefix, it->second);
Ashlesh Gawande4c0a7472018-08-08 12:20:33 -050078 }
79
80 // After setting the subscription list, send the sync interest
81 // The sync interest contains the subscription list
82 // When new data is received for any subscribed prefix, processSyncUpdate is called
83 m_consumer.sendSyncInterest();
84 }
85
86 void
87 processSyncUpdate(const std::vector<psync::MissingDataInfo>& updates)
88 {
89 for (const auto& update : updates) {
90 for (uint64_t i = update.lowSeq; i <= update.highSeq; i++) {
91 // Data can now be fetched using the prefix and sequence
92 NDN_LOG_INFO("Update: " << update.prefix << "/" << i);
93 }
94 }
95 }
96
97private:
98 ndn::Face m_face;
99 int m_nSub;
100
101 psync::Consumer m_consumer;
102 ndn::random::RandomNumberEngine& m_rng;
103};
104
105int
106main(int argc, char* argv[])
107{
108 if (argc != 3) {
109 std::cout << "usage: " << argv[0] << " "
110 << "<sync-prefix> <number-of-subscriptions>" << std::endl;
111 return 1;
112 }
113
114 try {
115 PSyncConsumer consumer(argv[1], std::stoi(argv[2]));
116 consumer.run();
117 }
118 catch (const std::exception& e) {
119 NDN_LOG_ERROR(e.what());
120 }
121}