blob: e1570fe565174e6483264ed7088bd8e6554d6e7d [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of PSync.
6 * See AUTHORS.md for complete list of PSync authors and contributors.
7 *
8 * PSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#include "partial-producer.hpp"
21#include "detail/state.hpp"
22
23#include <ndn-cxx/util/logger.hpp>
24
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050025#include <cstring>
26#include <limits>
27
28namespace psync {
29
30NDN_LOG_INIT(psync.PartialProducer);
31
32PartialProducer::PartialProducer(size_t expectedNumEntries,
33 ndn::Face& face,
34 const ndn::Name& syncPrefix,
35 const ndn::Name& userPrefix,
36 ndn::time::milliseconds syncReplyFreshness,
37 ndn::time::milliseconds helloReplyFreshness)
38 : ProducerBase(expectedNumEntries, face, syncPrefix,
39 userPrefix, syncReplyFreshness, helloReplyFreshness)
40{
41 m_registerPrefixId =
42 m_face.registerPrefix(m_syncPrefix,
43 [this] (const ndn::Name& syncPrefix) {
44 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("hello"),
45 std::bind(&PartialProducer::onHelloInterest, this, _1, _2));
46
47 m_face.setInterestFilter(ndn::Name(m_syncPrefix).append("sync"),
48 std::bind(&PartialProducer::onSyncInterest, this, _1, _2));
49 },
50 std::bind(&PartialProducer::onRegisterFailed, this, _1, _2));
51}
52
53PartialProducer::~PartialProducer()
54{
55 m_face.unregisterPrefix(m_registerPrefixId, nullptr, nullptr);
56}
57
58void
59PartialProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
60{
61 if (m_prefixes.find(prefix) == m_prefixes.end()) {
62 return;
63 }
64
65 uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
66
67 NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
68
69 updateSeqNo(prefix, newSeq);
70
71 satisfyPendingSyncInterests(prefix);
72}
73
74void
75PartialProducer::onHelloInterest(const ndn::Name& prefix, const ndn::Interest& interest)
76{
Ashlesh Gawande2e82df12018-12-08 21:42:29 -060077 if (m_segmentPublisher.replyFromStore(interest.getName())) {
Ashlesh Gawandeec43b362018-08-01 15:15:01 -050078 return;
79 }
80
Ashlesh Gawande2e82df12018-12-08 21:42:29 -060081 // Last component or fourth last component (in case of interest with version and segment)
82 // needs to be hello
83 if (interest.getName().get(interest.getName().size()-1).toUri() != "hello" &&
84 interest.getName().get(interest.getName().size()-4).toUri() != "hello") {
Ashlesh Gawandeec43b362018-08-01 15:15:01 -050085 return;
86 }
87
88 NDN_LOG_DEBUG("Hello Interest Received, nonce: " << interest);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050089
90 State state;
91
Ashlesh Gawandea9296472018-08-04 08:21:39 -050092 for (const auto& prefix : m_prefixes) {
93 state.addContent(ndn::Name(prefix.first).appendNumber(prefix.second));
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050094 }
95 NDN_LOG_DEBUG("sending content p: " << state);
96
97 ndn::Name helloDataName = prefix;
98 m_iblt.appendToName(helloDataName);
99
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500100 m_segmentPublisher.publish(interest.getName(), helloDataName,
101 state.wireEncode(), m_helloReplyFreshness);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500102}
103
104void
105PartialProducer::onSyncInterest(const ndn::Name& prefix, const ndn::Interest& interest)
106{
Ashlesh Gawande2e82df12018-12-08 21:42:29 -0600107 if (m_segmentPublisher.replyFromStore(interest.getName())) {
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500108 return;
109 }
110
Ashlesh Gawande2e82df12018-12-08 21:42:29 -0600111 NDN_LOG_DEBUG("Sync Interest Received, nonce: " << interest.getNonce() <<
112 " hash: " << std::hash<std::string>{}(interest.getName().toUri()));
113
114 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefix.size());
115 ndn::Name interestName;
116
117 if (nameWithoutSyncPrefix.size() == 4) {
118 // Get /<prefix>/BF/IBF/ from /<prefix>/BF/IBF (3 components of BF + 1 for IBF)
119 interestName = interest.getName();
120 }
121 else if (nameWithoutSyncPrefix.size() == 6) {
122 // Get <prefix>/BF/IBF/ from /<prefix>/BF/IBF/<version>/<segment-no>
123 interestName = interest.getName().getPrefix(-2);
124 }
125 else {
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500126 return;
127 }
128
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500129 ndn::name::Component bfName, ibltName;
130 unsigned int projectedCount;
131 double falsePositiveProb;
132 try {
133 projectedCount = interestName.get(interestName.size()-4).toNumber();
134 falsePositiveProb = interestName.get(interestName.size()-3).toNumber()/1000.;
135 bfName = interestName.get(interestName.size()-2);
136
137 ibltName = interestName.get(interestName.size()-1);
138 }
139 catch (const std::exception& e) {
140 NDN_LOG_ERROR("Cannot extract bloom filter and IBF from sync interest: " << e.what());
141 NDN_LOG_ERROR("Format: /<syncPrefix>/sync/<BF-count>/<BF-false-positive-probability>/<BF>/<IBF>");
142 return;
143 }
144
145 BloomFilter bf;
146 IBLT iblt(m_expectedNumEntries);
147
148 try {
149 bf = BloomFilter(projectedCount, falsePositiveProb, bfName);
150 iblt.initialize(ibltName);
151 }
152 catch (const std::exception& e) {
153 NDN_LOG_WARN(e.what());
154 return;
155 }
156
157 // get the difference
158 IBLT diff = m_iblt - iblt;
159
160 // non-empty positive means we have some elements that the others don't
161 std::set<uint32_t> positive;
162 std::set<uint32_t> negative;
163
164 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
165
166 bool peel = diff.listEntries(positive, negative);
167
168 NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
169
170 if (!peel) {
171 NDN_LOG_DEBUG("Can't decode the difference, sending application Nack");
172 sendApplicationNack(interestName);
173 return;
174 }
175
176 // generate content for Sync reply
177 State state;
178 NDN_LOG_TRACE("Size of positive set " << positive.size());
179 NDN_LOG_TRACE("Size of negative set " << negative.size());
180 for (const auto& hash : positive) {
181 ndn::Name prefix = m_hash2prefix[hash];
182 if (bf.contains(prefix.toUri())) {
183 // generate data
184 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
185 NDN_LOG_DEBUG("Content: " << prefix << " " << std::to_string(m_prefixes[prefix]));
186 }
187 }
188
189 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
190
191 if (positive.size() + negative.size() >= m_threshold || !state.getContent().empty()) {
192
193 // send back data
194 ndn::Name syncDataName = interestName;
195 m_iblt.appendToName(syncDataName);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500196
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500197 m_segmentPublisher.publish(interest.getName(), syncDataName,
198 state.wireEncode(), m_syncReplyFreshness);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500199 return;
200 }
201
202 ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
203 auto it = m_pendingEntries.emplace(interestName,
204 PendingEntryInfo{bf, iblt, std::move(scopedEventId)});
205
206 it.first->second.expirationEvent =
207 m_scheduler.scheduleEvent(interest.getInterestLifetime(),
208 [this, interest] {
209 NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
210 m_pendingEntries.erase(interest.getName());
211 });
212}
213
214void
215PartialProducer::satisfyPendingSyncInterests(const ndn::Name& prefix) {
216 NDN_LOG_TRACE("size of pending interest: " << m_pendingEntries.size());
217
218 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
219 const PendingEntryInfo& entry = it->second;
220
221 IBLT diff = m_iblt - entry.iblt;
222 std::set<uint32_t> positive;
223 std::set<uint32_t> negative;
224
225 bool peel = diff.listEntries(positive, negative);
226
227 NDN_LOG_TRACE("Result of listEntries on the difference: " << peel);
228
229 NDN_LOG_TRACE("Number elements in IBF: " << m_prefixes.size());
230 NDN_LOG_TRACE("m_threshold: " << m_threshold << " Total: " << positive.size() + negative.size());
231
232 if (!peel) {
233 NDN_LOG_TRACE("Decoding of differences with stored IBF unsuccessful, deleting pending interest");
234 m_pendingEntries.erase(it++);
235 continue;
236 }
237
238 State state;
239 if (entry.bf.contains(prefix.toUri()) || positive.size() + negative.size() >= m_threshold) {
240 if (entry.bf.contains(prefix.toUri())) {
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500241 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
242 NDN_LOG_DEBUG("sending sync content " << prefix << " " << std::to_string(m_prefixes[prefix]));
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500243 }
244 else {
245 NDN_LOG_DEBUG("Sending with empty content to send latest IBF to consumer");
246 }
247
248 // generate sync data and cancel the event
249 ndn::Name syncDataName = it->first;
250 m_iblt.appendToName(syncDataName);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500251
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500252 m_segmentPublisher.publish(it->first, syncDataName,
253 state.wireEncode(), m_syncReplyFreshness);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500254
255 m_pendingEntries.erase(it++);
256 }
257 else {
258 ++it;
259 }
260 }
261}
262
263} // namespace psync