blob: 9b30d8d280afdd8611fe24ceb2c6043348adce3e [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#ifndef PSYNC_CONSUMER_HPP
21#define PSYNC_CONSUMER_HPP
22
23#include "detail/bloom-filter.hpp"
24#include "detail/util.hpp"
25#include "detail/test-access-control.hpp"
26
27#include <ndn-cxx/face.hpp>
28#include <ndn-cxx/util/scheduler.hpp>
29#include <ndn-cxx/util/time.hpp>
30
31#include <random>
32#include <map>
33#include <vector>
34
35namespace psync {
36
37using namespace ndn::literals::time_literals;
38
39typedef std::function<void(const std::vector<ndn::Name>&)> ReceiveHelloCallback;
40typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
41
42const ndn::time::milliseconds HELLO_INTEREST_LIFETIME = 1_s;
43const ndn::time::milliseconds SYNC_INTEREST_LIFETIME = 1_s;
44
45/**
46 * @brief Consumer logic to subscribe to producer's data
47 *
48 * Application needs to call sendHelloInterest to get the subscription list
Ashlesh Gawandea9296472018-08-04 08:21:39 -050049 * in psync::ReceiveHelloCallback. It can then add the desired names using addSubscription.
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050050 * Finally application will call sendSyncInterest. If the application adds something
51 * later to the subscription list then it may call sendSyncInterest again for
52 * sending the next sync interest with updated IBF immediately to reduce any delay in sync data.
53 * Whenever there is new data UpdateCallback will be called to notify the application.
Ashlesh Gawandea9296472018-08-04 08:21:39 -050054 *
55 * If consumer wakes up after a long time to sync, producer may not decode the differences
56 * with its old IBF successfully and send an application nack. Upon receiving the nack,
57 * consumer will send a hello again and inform the application via psync::ReceiveHelloCallback
58 * and psync::UpdateCallback.
59 *
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050060 * Currently, fetching of the data needs to be handled by the application.
61 */
62class Consumer
63{
64public:
65 /**
66 * @brief constructor
67 *
68 * @param syncPrefix syncPrefix to send hello/sync interests to producer
69 * @param face application's face
70 * @param onReceiveHelloData call back to give hello data back to application
71 * @param onUpdate call back to give sync data back to application
72 * @param count bloom filter number of expected elements (subscriptions) in bloom filter
73 * @param false_positive bloom filter false positive probability
74 * @param helloInterestLifetime lifetime of hello interest
75 * @param syncInterestLifetime lifetime of sync interest
76 */
77 Consumer(const ndn::Name& syncPrefix,
78 ndn::Face& face,
79 const ReceiveHelloCallback& onReceiveHelloData,
80 const UpdateCallback& onUpdate,
81 unsigned int count,
82 double false_positive,
83 ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
84 ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
85
86 /**
87 * @brief send hello interest /<sync-prefix>/hello/
88 *
89 * Should be called by the application whenever it wants to send a hello
90 */
91 void
92 sendHelloInterest();
93
94 /**
95 * @brief send sync interest /<sync-prefix>/sync/\<BF\>/\<producers-IBF\>
96 *
97 * Should be called after subscription list is set or updated
98 */
99 void
100 sendSyncInterest();
101
102 /**
103 * @brief Add prefix to subscription list
104 *
105 * @param prefix prefix to be added to the list
Ashlesh Gawandea9296472018-08-04 08:21:39 -0500106 * @return true if prefix is added, false if it is already present
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500107 */
108 bool
109 addSubscription(const ndn::Name& prefix);
110
111 std::set<ndn::Name>
112 getSubscriptionList() const
113 {
114 return m_subscriptionList;
115 }
116
117 bool
118 isSubscribed(const ndn::Name& prefix) const
119 {
120 return m_subscriptionList.find(prefix) != m_subscriptionList.end();
121 }
122
123 ndn::optional<uint64_t>
124 getSeqNo(const ndn::Name& prefix) const
125 {
126 auto it = m_prefixes.find(prefix);
127 if (it == m_prefixes.end()) {
128 return ndn::nullopt;
129 }
130 return it->second;
131 }
132
133private:
134 /**
135 * @brief Get hello data from the producer
136 *
137 * Format: /<sync-prefix>/hello/\<BF\>/\<producer-IBF\>
138 * Data content is all the prefixes the producer has.
139 * We store the producer's IBF to be used in sending sync interest
140 *
141 * m_onReceiveHelloData is called to let the application know
142 * so that it can set the subscription list using addSubscription
143 *
144 * @param interest hello interest
145 * @param data hello data
146 */
147 void
148 onHelloData(const ndn::Interest& interest, const ndn::Data& data);
149
150 /**
151 * @brief Get hello data from the producer
152 *
153 * Format: <sync-prefix>/sync/\<BF\>/\<producers-IBF\>/\<producers-latest-IBF\>
154 * Data content is all the prefixes the producer thinks the consumer doesn't have
155 * have the latest update for. We update our copy of producer's IBF with the latest one.
156 * Then we send another sync interest after a random jitter.
157 *
158 * @param interest sync interest
159 * @param data sync data
160 */
161 void
162 onSyncData(const ndn::Interest& interest, const ndn::Data& data);
163
164 void
165 onHelloTimeout(const ndn::Interest& interest);
166
167 void
168 onSyncTimeout(const ndn::Interest& interest);
169
170 void
171 onNackForHello(const ndn::Interest& interest, const ndn::lp::Nack& nack);
172
173 void
174 onNackForSync(const ndn::Interest& interest, const ndn::lp::Nack& nack);
175
176PUBLIC_WITH_TESTS_ELSE_PRIVATE:
177 ndn::Face& m_face;
178 ndn::Scheduler m_scheduler;
179
180 ndn::Name m_syncPrefix;
181 ndn::Name m_helloInterestPrefix;
182 ndn::Name m_syncInterestPrefix;
183 ndn::Name m_iblt;
184
185 ReceiveHelloCallback m_onReceiveHelloData;
186
187 // Called when new sync update is received from producer.
188 UpdateCallback m_onUpdate;
189
190 // Bloom filter is used to store application/user's subscription list.
191 BloomFilter m_bloomFilter;
192
193 ndn::time::milliseconds m_helloInterestLifetime;
194 ndn::time::milliseconds m_syncInterestLifetime;
195
196 // Store sequence number for the prefix.
197 std::map<ndn::Name, uint64_t> m_prefixes;
198 std::set<ndn::Name> m_subscriptionList;
199
200 const ndn::PendingInterestId* m_outstandingInterestId;
201
202 std::mt19937 m_rng;
203 std::uniform_int_distribution<> m_rangeUniformRandom;
204};
205
206} // namespace psync
207
208#endif // PSYNC_CONSUMER_HPP