blob: 47825a956e1b0ec7cf787afb609357ee3f16fdfd [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#ifndef PSYNC_CONSUMER_HPP
21#define PSYNC_CONSUMER_HPP
22
23#include "detail/bloom-filter.hpp"
24#include "detail/util.hpp"
25#include "detail/test-access-control.hpp"
26
27#include <ndn-cxx/face.hpp>
28#include <ndn-cxx/util/scheduler.hpp>
29#include <ndn-cxx/util/time.hpp>
30
31#include <random>
32#include <map>
33#include <vector>
34
35namespace psync {
36
37using namespace ndn::literals::time_literals;
38
39typedef std::function<void(const std::vector<ndn::Name>&)> ReceiveHelloCallback;
40typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
41
42const ndn::time::milliseconds HELLO_INTEREST_LIFETIME = 1_s;
43const ndn::time::milliseconds SYNC_INTEREST_LIFETIME = 1_s;
44
45/**
46 * @brief Consumer logic to subscribe to producer's data
47 *
48 * Application needs to call sendHelloInterest to get the subscription list
49 * in ReceiveHelloCallback. It can then add the desired names using addSubscription.
50 * Finally application will call sendSyncInterest. If the application adds something
51 * later to the subscription list then it may call sendSyncInterest again for
52 * sending the next sync interest with updated IBF immediately to reduce any delay in sync data.
53 * Whenever there is new data UpdateCallback will be called to notify the application.
54 * Currently, fetching of the data needs to be handled by the application.
55 */
56class Consumer
57{
58public:
59 /**
60 * @brief constructor
61 *
62 * @param syncPrefix syncPrefix to send hello/sync interests to producer
63 * @param face application's face
64 * @param onReceiveHelloData call back to give hello data back to application
65 * @param onUpdate call back to give sync data back to application
66 * @param count bloom filter number of expected elements (subscriptions) in bloom filter
67 * @param false_positive bloom filter false positive probability
68 * @param helloInterestLifetime lifetime of hello interest
69 * @param syncInterestLifetime lifetime of sync interest
70 */
71 Consumer(const ndn::Name& syncPrefix,
72 ndn::Face& face,
73 const ReceiveHelloCallback& onReceiveHelloData,
74 const UpdateCallback& onUpdate,
75 unsigned int count,
76 double false_positive,
77 ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
78 ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
79
80 /**
81 * @brief send hello interest /<sync-prefix>/hello/
82 *
83 * Should be called by the application whenever it wants to send a hello
84 */
85 void
86 sendHelloInterest();
87
88 /**
89 * @brief send sync interest /<sync-prefix>/sync/\<BF\>/\<producers-IBF\>
90 *
91 * Should be called after subscription list is set or updated
92 */
93 void
94 sendSyncInterest();
95
96 /**
97 * @brief Add prefix to subscription list
98 *
99 * @param prefix prefix to be added to the list
100 */
101 bool
102 addSubscription(const ndn::Name& prefix);
103
104 std::set<ndn::Name>
105 getSubscriptionList() const
106 {
107 return m_subscriptionList;
108 }
109
110 bool
111 isSubscribed(const ndn::Name& prefix) const
112 {
113 return m_subscriptionList.find(prefix) != m_subscriptionList.end();
114 }
115
116 ndn::optional<uint64_t>
117 getSeqNo(const ndn::Name& prefix) const
118 {
119 auto it = m_prefixes.find(prefix);
120 if (it == m_prefixes.end()) {
121 return ndn::nullopt;
122 }
123 return it->second;
124 }
125
126private:
127 /**
128 * @brief Get hello data from the producer
129 *
130 * Format: /<sync-prefix>/hello/\<BF\>/\<producer-IBF\>
131 * Data content is all the prefixes the producer has.
132 * We store the producer's IBF to be used in sending sync interest
133 *
134 * m_onReceiveHelloData is called to let the application know
135 * so that it can set the subscription list using addSubscription
136 *
137 * @param interest hello interest
138 * @param data hello data
139 */
140 void
141 onHelloData(const ndn::Interest& interest, const ndn::Data& data);
142
143 /**
144 * @brief Get hello data from the producer
145 *
146 * Format: <sync-prefix>/sync/\<BF\>/\<producers-IBF\>/\<producers-latest-IBF\>
147 * Data content is all the prefixes the producer thinks the consumer doesn't have
148 * have the latest update for. We update our copy of producer's IBF with the latest one.
149 * Then we send another sync interest after a random jitter.
150 *
151 * @param interest sync interest
152 * @param data sync data
153 */
154 void
155 onSyncData(const ndn::Interest& interest, const ndn::Data& data);
156
157 void
158 onHelloTimeout(const ndn::Interest& interest);
159
160 void
161 onSyncTimeout(const ndn::Interest& interest);
162
163 void
164 onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack);
165
166 void
167 onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack);
168
169PUBLIC_WITH_TESTS_ELSE_PRIVATE:
170 ndn::Face& m_face;
171 ndn::Scheduler m_scheduler;
172
173 ndn::Name m_syncPrefix;
174 ndn::Name m_helloInterestPrefix;
175 ndn::Name m_syncInterestPrefix;
176 ndn::Name m_iblt;
177
178 ReceiveHelloCallback m_onReceiveHelloData;
179
180 // Called when new sync update is received from producer.
181 UpdateCallback m_onUpdate;
182
183 // Bloom filter is used to store application/user's subscription list.
184 BloomFilter m_bloomFilter;
185
186 ndn::time::milliseconds m_helloInterestLifetime;
187 ndn::time::milliseconds m_syncInterestLifetime;
188
189 // Store sequence number for the prefix.
190 std::map<ndn::Name, uint64_t> m_prefixes;
191 std::set<ndn::Name> m_subscriptionList;
192
193 const ndn::PendingInterestId* m_outstandingInterestId;
194
195 std::mt19937 m_rng;
196 std::uniform_int_distribution<> m_rangeUniformRandom;
197};
198
199} // namespace psync
200
201#endif // PSYNC_CONSUMER_HPP