blob: 73fb677d8562e7195ae09386af94adb117db6328 [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#include "partial-producer.hpp"
21#include "detail/state.hpp"
22
23#include <ndn-cxx/util/logger.hpp>
24
25#include <iostream>
26#include <cstring>
27#include <limits>
28
29namespace psync {
30
31NDN_LOG_INIT(psync.PartialProducer);
32
33PartialProducer::PartialProducer(size_t expectedNumEntries,
34 ndn::Face& face,
35 const ndn::Name& syncPrefix,
36 const ndn::Name& userPrefix,
37 ndn::time::milliseconds syncReplyFreshness,
38 ndn::time::milliseconds helloReplyFreshness)
39 : ProducerBase(expectedNumEntries, face, syncPrefix,
40 userPrefix, syncReplyFreshness, helloReplyFreshness)
41{
42 m_registerPrefixId =
43 m_face.registerPrefix(m_syncPrefix,
44 [this] (const ndn::Name& syncPrefix) {
45 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
46 std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
47
48 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
49 std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
50 },
51 std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
52}
53
54PartialProducer::~PartialProducer()
55{
56 m_face.unregisterPrefix(m_registerPrefixId, nullptr, nullptr);
57}
58
59void
60PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
61{
62 if (m_prefixes.find(prefix) == m_prefixes.end()) {
63 return;
64 }
65
66 uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
67
68 NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
69
70 updateSeqNo(prefix, newSeq);
71
72 satisfyPendingSyncInterests(prefix);
73}
74
75void
76PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
77{
78 NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest.getNonce());
79
80 State state;
81
82 for (const auto& p : m_prefixes) {
83 state.addContent(p.first);
84 }
85 NDN_LOG_DEBUG("sending content p: " << state);
86
87 ndn::Name helloDataName = prefix;
88 m_iblt.appendToName(helloDataName);
89
90 ndn::Data data;
91 data.setName(helloDataName);
92 data.setFreshnessPeriod(m_helloReplyFreshness);
93 data.setContent(state.wireEncode());
94
95 m_keyChain.sign(data);
96 m_face.put(data);
97}
98
99void
100PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
101{
102 NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
103 " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
104
105 ndn::Name interestName = interest.getName();
106
107 ndn::name::Component bfName, ibltName;
108 unsigned int projectedCount;
109 double falsePositiveProb;
110 try {
111 projectedCount = interestName.get(interestName.size()-4).toNumber();
112 falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
113 bfName = interestName.get(interestName.size()-2);
114
115 ibltName = interestName.get(interestName.size()-1);
116 }
117 catch (const std::exception& e) {
118 NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
119 NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
120 return;
121 }
122
123 BloomFilter bf;
124 IBLT iblt(m_expectedNumEntries);
125
126 try {
127 bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
128 iblt.initialize(ibltName);
129 }
130 catch (const std::exception& e) {
131 NDN_LOG_WARN(e.what());
132 return;
133 }
134
135 // get the difference
136 IBLT diff = m_iblt - iblt;
137
138 // non-empty positive means we have some elements that the others don't
139 std::set<uint32_t> positive;
140 std::set<uint32_t> negative;
141
142 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
143
144 bool peel = diff.listEntries(positive, negative);
145
146 NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
147
148 if (!peel) {
149 NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
150 sendApplicationNack(interestName);
151 return;
152 }
153
154 // generate content for Sync reply
155 State state;
156 NDN_LOG_TRACE("Size of positive set " << positive.size());
157 NDN_LOG_TRACE("Size of negative set " << negative.size());
158 for (const auto& hash : positive) {
159 ndn::Name prefix = m_hash2prefix[hash];
160 if (bf.contains(prefix.toUri())) {
161 // generate data
162 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
163 NDN_LOG_DEBUG("Content: " << prefix << " " << std::to_string(m_prefixes[prefix]));
164 }
165 }
166
167 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
168
169 if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
170
171 // send back data
172 ndn::Name syncDataName = interestName;
173 m_iblt.appendToName(syncDataName);
174 ndn::Data data;
175 data.setName(syncDataName);
176 data.setFreshnessPeriod(m_syncReplyFreshness);
177 data.setContent(state.wireEncode());
178
179 m_keyChain.sign(data);
180 NDN_LOG_DEBUG("Sending sync data");
181 m_face.put(data);
182
183 return;
184 }
185
186 ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
187 auto it = m_pendingEntries.emplace(interestName,
188 PendingEntryInfo{bf, iblt, std::move(scopedEventId)});
189
190 it.first->second.expirationEvent =
191 m_scheduler.scheduleEvent(interest.getInterestLifetime(),
192 [this, interest] {
193 NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
194 m_pendingEntries.erase(interest.getName());
195 });
196}
197
198void
199PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
200 NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
201
202 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
203 const PendingEntryInfo& entry = it->second;
204
205 IBLT diff = m_iblt - entry.iblt;
206 std::set<uint32_t> positive;
207 std::set<uint32_t> negative;
208
209 bool peel = diff.listEntries(positive, negative);
210
211 NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
212
213 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
214 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
215
216 if (!peel) {
217 NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
218 m_pendingEntries.erase(it++);
219 continue;
220 }
221
222 State state;
223 if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
224 if (entry.bf.contains(prefix.toUri())) {
225 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
226 NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
227 }
228 else {
229 NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
230 }
231
232 // generate sync data and cancel the event
233 ndn::Name syncDataName = it->first;
234 m_iblt.appendToName(syncDataName);
235 ndn::Data data;
236 data.setName(syncDataName);
237 data.setFreshnessPeriod(m_syncReplyFreshness);
238 data.setContent(state.wireEncode());
239
240 m_keyChain.sign(data);
241 NDN_LOG_DEBUG("Sending sync data");
242 m_face.put(data);
243
244 m_pendingEntries.erase(it++);
245 }
246 else {
247 ++it;
248 }
249 }
250}
251
252} // namespace psync