blob: c00bd0ab68de1c933ed7d742364eecd2e9ec96b2 [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of NLSR (Named-data Link State Routing).
6 * See AUTHORS.md for complete list of NLSR authors and contributors.
7 *
8 * NLSR is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#include "full-producer.hpp"
21
22#include <ndn-cxx/util/logger.hpp>
23#include <ndn-cxx/util/segment-fetcher.hpp>
24#include <ndn-cxx/security/validator-null.hpp>
25
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -050026#include <cstring>
27#include <limits>
28#include <functional>
29
30namespace psync {
31
32NDN_LOG_INIT(psync.FullProducer);
33
34FullProducer::FullProducer(const size_t expectedNumEntries,
35 ndn::Face& face,
36 const ndn::Name& syncPrefix,
37 const ndn::Name& userPrefix,
38 const UpdateCallback& onUpdateCallBack,
39 ndn::time::milliseconds syncInterestLifetime,
40 ndn::time::milliseconds syncReplyFreshness)
41 : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
42 , m_syncInterestLifetime(syncInterestLifetime)
43 , m_onUpdate(onUpdateCallBack)
44 , m_outstandingInterestId(nullptr)
45 , m_scheduledSyncInterestId(m_scheduler)
46{
47 int jitter = m_syncInterestLifetime.count() * .20;
48 m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
49
50 m_registerPrefixId =
51 m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
52 std::bind(&FullProducer::onInterest, this, _1, _2),
53 std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
54
55 // Should we do this after setInterestFilter success call back
56 // (Currently following ChronoSync's way)
57 sendSyncInterest();
58}
59
60FullProducer::~FullProducer()
61{
62 if (m_outstandingInterestId != nullptr) {
63 m_face.removePendingInterest(m_outstandingInterestId);
64 }
65
66 m_face.unsetInterestFilter(m_registerPrefixId);
67}
68
69void
70FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
71{
72 if (m_prefixes.find(prefix) == m_prefixes.end()) {
73 NDN_LOG_WARN("Prefix not added: " << prefix);
74 return;
75 }
76
77 uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
78
79 NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
80
81 updateSeqNo(prefix, newSeq);
82
83 satisfyPendingInterests();
84}
85
86void
87FullProducer::sendSyncInterest()
88{
89 // If we send two sync interest one after the other
90 // since there is no new data in the network yet,
91 // when data is available it may satisfy both of them
92 if (m_outstandingInterestId != nullptr) {
93 m_face.removePendingInterest(m_outstandingInterestId);
94 }
95
96 // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
97 ndn::Name syncInterestName = m_syncPrefix;
98
99 // Append our latest IBF
100 m_iblt.appendToName(syncInterestName);
101
102 m_outstandingInterestName = syncInterestName;
103
104 m_scheduledSyncInterestId =
105 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
106 [this] { sendSyncInterest(); });
107
108 ndn::Interest syncInterest(syncInterestName);
109 syncInterest.setInterestLifetime(m_syncInterestLifetime);
110 // Other side appends hash of IBF to sync data name
111 syncInterest.setCanBePrefix(true);
112 syncInterest.setMustBeFresh(true);
113
114 syncInterest.setNonce(1);
115 syncInterest.refreshNonce();
116
117 m_outstandingInterestId = m_face.expressInterest(syncInterest,
118 std::bind(&FullProducer::onSyncData, this, _1, _2),
119 [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
120 NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
121 " for Interest with Nonce: " << interest.getNonce());
122 },
123 [] (const ndn::Interest& interest) {
124 NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
125 });
126
127 NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
128 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
129}
130
131void
132FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
133{
134 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
135 if (nameWithoutSyncPrefix.size() == 2 &&
136 nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
137 onRecoveryInterest(interest);
138 }
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500139 // interest for recovery segment
140 else if (nameWithoutSyncPrefix.size() == 3 &&
141 nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 2) == RECOVERY_PREFIX.get(0)) {
142 onRecoveryInterest(interest);
143 }
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500144 else if (nameWithoutSyncPrefix.size() == 1) {
145 onSyncInterest(interest);
146 }
147}
148
149void
150FullProducer::onSyncInterest(const ndn::Interest& interest)
151{
152 ndn::Name interestName = interest.getName();
153 ndn::name::Component ibltName = interestName.get(interestName.size()-1);
154
155 NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
156 ", hash: " << std::hash<ndn::Name>{}(interestName));
157
158 IBLT iblt(m_expectedNumEntries);
159 try {
160 iblt.initialize(ibltName);
161 }
162 catch (const std::exception& e) {
163 NDN_LOG_WARN(e.what());
164 return;
165 }
166
167 IBLT diff = m_iblt - iblt;
168
169 std::set<uint32_t> positive;
170 std::set<uint32_t> negative;
171
172 if (!diff.listEntries(positive, negative)) {
173 NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
174 << " negative: " << negative.size() << " m_threshold: "
175 << m_threshold);
176
177 // Send nack if greater then threshold, else send positive below as usual
178 // Or send if we can't get neither positive nor negative differences
179 if (positive.size() + negative.size() >= m_threshold ||
180 (positive.size() == 0 && negative.size() == 0)) {
181
182 // If we don't have anything to offer means that
183 // we are behind and should not mislead other nodes.
184 bool haveSomethingToOffer = false;
185 for (const auto& content : m_prefixes) {
186 if (content.second != 0) {
187 haveSomethingToOffer = true;
188 }
189 }
190
191 if (haveSomethingToOffer) {
192 sendApplicationNack(interestName);
193 }
194 return;
195 }
196 }
197
198 State state;
199 for (const auto& hash : positive) {
200 ndn::Name prefix = m_hash2prefix[hash];
201 // Don't sync up sequence number zero
202 if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
203 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
204 }
205 }
206
207 if (!state.getContent().empty()) {
208 NDN_LOG_DEBUG("Sending sync content: " << state);
209 sendSyncData(interestName, state.wireEncode());
210 return;
211 }
212
213 ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
214 auto it = m_pendingEntries.emplace(interestName,
215 PendingEntryInfoFull{iblt, std::move(scopedEventId)});
216
217 it.first->second.expirationEvent =
218 m_scheduler.scheduleEvent(interest.getInterestLifetime(),
219 [this, interest] {
220 NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
221 m_pendingEntries.erase(interest.getName());
222 });
223}
224
225void
226FullProducer::onRecoveryInterest(const ndn::Interest& interest)
227{
228 NDN_LOG_DEBUG("Recovery interest received");
229
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500230 if (m_segmentPublisher.replyFromStore(interest.getName())) {
231 return;
232 }
233
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500234 State state;
235 for (const auto& content : m_prefixes) {
236 if (content.second != 0) {
237 state.addContent(ndn::Name(content.first).appendNumber(content.second));
238 }
239 }
240
Ashlesh Gawandeec43b362018-08-01 15:15:01 -0500241 m_segmentPublisher.publish(interest.getName(), interest.getName(),
242 state.wireEncode(), m_syncReplyFreshness);
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500243}
244
245void
246FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
247{
248 NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
249
250 ndn::Name nameWithIblt;
251 m_iblt.appendToName(nameWithIblt);
252
253 // Append hash of our IBF so that data name maybe different for each node answering
254 ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
255 data.setFreshnessPeriod(m_syncReplyFreshness);
256 data.setContent(block);
257 m_keyChain.sign(data);
258
259 // checking if our own interest got satisfied
260 if (m_outstandingInterestName == name) {
261 NDN_LOG_DEBUG("Satisfied our own pending interest");
262 // remove outstanding interest
263 if (m_outstandingInterestId != nullptr) {
264 NDN_LOG_DEBUG("Removing our pending interest from face");
265 m_face.removePendingInterest(m_outstandingInterestId);
266 m_outstandingInterestId = nullptr;
267 m_outstandingInterestName = ndn::Name("");
268 }
269
270 NDN_LOG_DEBUG("Sending Sync Data");
271
272 // Send data after removing pending sync interest on face
273 m_face.put(data);
274
275 NDN_LOG_TRACE("Renewing sync interest");
276 sendSyncInterest();
277 }
278 else {
279 NDN_LOG_DEBUG("Sending Sync Data");
280 m_face.put(data);
281 }
282}
283
284void
285FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
286{
287 ndn::Name interestName = interest.getName();
288 deletePendingInterests(interest.getName());
289
290 if (data.getContentType() == ndn::tlv::ContentType_Nack) {
291 NDN_LOG_DEBUG("Got application nack, sending recovery interest");
292 sendRecoveryInterest(interest);
293 return;
294 }
295
296 State state(data.getContent());
297 std::vector<MissingDataInfo> updates;
298
299 if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
300 state.getContent().empty()) {
301 NDN_LOG_TRACE("Recovery data is empty, other side is behind");
302 return;
303 }
304
305 NDN_LOG_DEBUG("Sync Data Received: " << state);
306
307 for (const auto& content : state.getContent()) {
308 ndn::Name prefix = content.getPrefix(-1);
309 uint64_t seq = content.get(content.size()-1).toNumber();
310
311 if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
312 updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
313 updateSeqNo(prefix, seq);
314 // We should not call satisfyPendingSyncInterests here because we just
315 // got data and deleted pending interest by calling deletePendingFullSyncInterests
316 // But we might have interests not matching to this interest that might not have deleted
317 // from pending sync interest
318 }
319 }
320
321 // We just got the data, so send a new sync interest
322 if (!updates.empty()) {
323 m_onUpdate(updates);
324 NDN_LOG_TRACE("Renewing sync interest");
325 sendSyncInterest();
326 }
327 else {
328 NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
329 " , hash: " << std::hash<ndn::Name>{}(interestName));
330 }
331}
332
333void
334FullProducer::satisfyPendingInterests()
335{
336 NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
337
338 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
339 const PendingEntryInfoFull& entry = it->second;
340 IBLT diff = m_iblt - entry.iblt;
341 std::set<uint32_t> positive;
342 std::set<uint32_t> negative;
343
344 if (!diff.listEntries(positive, negative)) {
345 NDN_LOG_TRACE("Decode failed for pending interest");
346 if (positive.size() + negative.size() >= m_threshold ||
347 (positive.size() == 0 && negative.size() == 0)) {
348 NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
349 m_pendingEntries.erase(it++);
350 continue;
351 }
352 }
353
354 State state;
355 for (const auto& hash : positive) {
356 ndn::Name prefix = m_hash2prefix[hash];
357
358 if (m_prefixes[prefix] != 0) {
359 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
360 }
361 }
362
363 if (!state.getContent().empty()) {
364 NDN_LOG_DEBUG("Satisfying sync content: " << state);
365 sendSyncData(it->first, state.wireEncode());
366 m_pendingEntries.erase(it++);
367 }
368 else {
369 ++it;
370 }
371 }
372}
373
374bool
375FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
376{
377 uint32_t nextHash = murmurHash3(N_HASHCHECK,
378 ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
379 for (const auto& nHash : negative) {
380 if (nHash == nextHash) {
381 return true;
382 break;
383 }
384 }
385 return false;
386}
387
388void
389FullProducer::deletePendingInterests(const ndn::Name& interestName) {
390 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
391 if (it->first == interestName) {
392 NDN_LOG_TRACE("Delete pending interest: " << interestName);
393 m_pendingEntries.erase(it++);
394 }
395 else {
396 ++it;
397 }
398 }
399}
400
401void
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -0500402FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
403{
404 if (m_outstandingInterestId != nullptr) {
405 m_face.removePendingInterest(m_outstandingInterestId);
406 m_outstandingInterestId = nullptr;
407 }
408
409 ndn::Name ibltName;
410 m_iblt.appendToName(ibltName);
411
412 ndn::Name recoveryInterestName(m_syncPrefix);
413 recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
414 recoveryInterestName.append(RECOVERY_PREFIX);
415
416 ndn::Interest recoveryInterest(recoveryInterestName);
417 recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
418
419 auto fetcher = ndn::util::SegmentFetcher::start(m_face,
420 recoveryInterest,
421 ndn::security::v2::getAcceptAllValidator());
422
423 fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
424 NDN_LOG_TRACE("Segment fetcher got data");
425 ndn::Data data;
426 data.setContent(std::move(bufferPtr));
427 onSyncData(recoveryInterest, data);
428 });
429
430 fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
431 NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
432 " message: " << msg);
433 });
434}
435
436} // namespace psync