blob: 6460bc5930ada66486258255c417c7e1188423fa [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#ifndef PSYNC_CONSUMER_HPP
21#define PSYNC_CONSUMER_HPP
22
23#include "detail/bloom-filter.hpp"
24#include "detail/util.hpp"
25#include "detail/test-access-control.hpp"
26
27#include <ndn-cxx/face.hpp>
28#include <ndn-cxx/util/scheduler.hpp>
Ashlesh Gawandeec43b362018-08-01 15:15:01 -050029#include <ndn-cxx/util/segment-fetcher.hpp>
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050030#include <ndn-cxx/util/time.hpp>
31
32#include <random>
33#include <map>
34#include <vector>
35
36namespace psync {
37
38using namespace ndn::literals::time_literals;
39
40typedef std::function<void(const std::vector<ndn::Name>&)> ReceiveHelloCallback;
41typedef std::function<void(const std::vector<MissingDataInfo>&)> UpdateCallback;
42
43const ndn::time::milliseconds HELLO_INTEREST_LIFETIME = 1_s;
44const ndn::time::milliseconds SYNC_INTEREST_LIFETIME = 1_s;
45
46/**
47 * @brief Consumer logic to subscribe to producer's data
48 *
49 * Application needs to call sendHelloInterest to get the subscription list
Ashlesh Gawandea9296472018-08-04 08:21:39 -050050 * in psync::ReceiveHelloCallback. It can then add the desired names using addSubscription.
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050051 * Finally application will call sendSyncInterest. If the application adds something
52 * later to the subscription list then it may call sendSyncInterest again for
53 * sending the next sync interest with updated IBF immediately to reduce any delay in sync data.
54 * Whenever there is new data UpdateCallback will be called to notify the application.
Ashlesh Gawandea9296472018-08-04 08:21:39 -050055 *
56 * If consumer wakes up after a long time to sync, producer may not decode the differences
57 * with its old IBF successfully and send an application nack. Upon receiving the nack,
58 * consumer will send a hello again and inform the application via psync::ReceiveHelloCallback
59 * and psync::UpdateCallback.
60 *
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050061 * Currently, fetching of the data needs to be handled by the application.
62 */
63class Consumer
64{
65public:
66 /**
67 * @brief constructor
68 *
69 * @param syncPrefix syncPrefix to send hello/sync interests to producer
70 * @param face application's face
71 * @param onReceiveHelloData call back to give hello data back to application
72 * @param onUpdate call back to give sync data back to application
73 * @param count bloom filter number of expected elements (subscriptions) in bloom filter
74 * @param false_positive bloom filter false positive probability
75 * @param helloInterestLifetime lifetime of hello interest
76 * @param syncInterestLifetime lifetime of sync interest
77 */
78 Consumer(const ndn::Name& syncPrefix,
79 ndn::Face& face,
80 const ReceiveHelloCallback& onReceiveHelloData,
81 const UpdateCallback& onUpdate,
82 unsigned int count,
83 double false_positive,
84 ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
85 ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
86
87 /**
88 * @brief send hello interest /<sync-prefix>/hello/
89 *
90 * Should be called by the application whenever it wants to send a hello
91 */
92 void
93 sendHelloInterest();
94
95 /**
96 * @brief send sync interest /<sync-prefix>/sync/\<BF\>/\<producers-IBF\>
97 *
98 * Should be called after subscription list is set or updated
99 */
100 void
101 sendSyncInterest();
102
103 /**
104 * @brief Add prefix to subscription list
105 *
106 * @param prefix prefix to be added to the list
Ashlesh Gawandea9296472018-08-04 08:21:39 -0500107 * @return true if prefix is added, false if it is already present
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500108 */
109 bool
110 addSubscription(const ndn::Name& prefix);
111
112 std::set<ndn::Name>
113 getSubscriptionList() const
114 {
115 return m_subscriptionList;
116 }
117
118 bool
119 isSubscribed(const ndn::Name& prefix) const
120 {
121 return m_subscriptionList.find(prefix) != m_subscriptionList.end();
122 }
123
124 ndn::optional<uint64_t>
125 getSeqNo(const ndn::Name& prefix) const
126 {
127 auto it = m_prefixes.find(prefix);
128 if (it == m_prefixes.end()) {
129 return ndn::nullopt;
130 }
131 return it->second;
132 }
133
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500134 /**
135 * @brief Stop segment fetcher to stop the sync and free resources
136 */
137 void
138 stop();
139
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500140private:
141 /**
142 * @brief Get hello data from the producer
143 *
144 * Format: /<sync-prefix>/hello/\<BF\>/\<producer-IBF\>
145 * Data content is all the prefixes the producer has.
146 * We store the producer's IBF to be used in sending sync interest
147 *
148 * m_onReceiveHelloData is called to let the application know
149 * so that it can set the subscription list using addSubscription
150 *
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500151 * @param bufferPtr hello data content
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500152 */
153 void
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500154 onHelloData(const ndn::ConstBufferPtr& bufferPtr);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500155
156 /**
157 * @brief Get hello data from the producer
158 *
159 * Format: <sync-prefix>/sync/\<BF\>/\<producers-IBF\>/\<producers-latest-IBF\>
160 * Data content is all the prefixes the producer thinks the consumer doesn't have
161 * have the latest update for. We update our copy of producer's IBF with the latest one.
162 * Then we send another sync interest after a random jitter.
163 *
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500164 * @param bufferPtr sync data content
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500165 */
166 void
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500167 onSyncData(const ndn::ConstBufferPtr& bufferPtr);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500168
169PUBLIC_WITH_TESTS_ELSE_PRIVATE:
170 ndn::Face& m_face;
171 ndn::Scheduler m_scheduler;
172
173 ndn::Name m_syncPrefix;
174 ndn::Name m_helloInterestPrefix;
175 ndn::Name m_syncInterestPrefix;
176 ndn::Name m_iblt;
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500177 ndn::Name m_helloDataName;
178 ndn::Name m_syncDataName;
179 uint32_t m_syncDataContentType;
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500180
181 ReceiveHelloCallback m_onReceiveHelloData;
182
183 // Called when new sync update is received from producer.
184 UpdateCallback m_onUpdate;
185
186 // Bloom filter is used to store application/user's subscription list.
187 BloomFilter m_bloomFilter;
188
189 ndn::time::milliseconds m_helloInterestLifetime;
190 ndn::time::milliseconds m_syncInterestLifetime;
191
192 // Store sequence number for the prefix.
193 std::map<ndn::Name, uint64_t> m_prefixes;
194 std::set<ndn::Name> m_subscriptionList;
195
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500196 std::mt19937 m_rng;
197 std::uniform_int_distribution<> m_rangeUniformRandom;
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500198 std::shared_ptr<ndn::util::SegmentFetcher> m_helloFetcher;
199 std::shared_ptr<ndn::util::SegmentFetcher> m_syncFetcher;
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500200};
201
202} // namespace psync
203
204#endif // PSYNC_CONSUMER_HPP