blob: dd4d452857ddc93fa21c575e5810933504805605 [file] [log] [blame]
Ashlesh Gawande0b2897e2018-06-20 14:40:47 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/*
3 * Copyright (c) 2014-2018, The University of Memphis
4 *
5 * This file is part of NLSR (Named-data Link State Routing).
6 * See AUTHORS.md for complete list of NLSR authors and contributors.
7 *
8 * NLSR is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation,
10 * either version 3 of the License, or (at your option) any later version.
11 *
12 * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 **/
19
20#include "full-producer.hpp"
21
22#include <ndn-cxx/util/logger.hpp>
23#include <ndn-cxx/util/segment-fetcher.hpp>
24#include <ndn-cxx/security/validator-null.hpp>
25
26#include <iostream>
27#include <cstring>
28#include <limits>
29#include <functional>
30
31namespace psync {
32
33NDN_LOG_INIT(psync.FullProducer);
34
35FullProducer::FullProducer(const size_t expectedNumEntries,
36 ndn::Face& face,
37 const ndn::Name& syncPrefix,
38 const ndn::Name& userPrefix,
39 const UpdateCallback& onUpdateCallBack,
40 ndn::time::milliseconds syncInterestLifetime,
41 ndn::time::milliseconds syncReplyFreshness)
42 : ProducerBase(expectedNumEntries, face, syncPrefix, userPrefix, syncReplyFreshness)
43 , m_syncInterestLifetime(syncInterestLifetime)
44 , m_onUpdate(onUpdateCallBack)
45 , m_outstandingInterestId(nullptr)
46 , m_scheduledSyncInterestId(m_scheduler)
47{
48 int jitter = m_syncInterestLifetime.count() * .20;
49 m_jitter = std::uniform_int_distribution<>(-jitter, jitter);
50
51 m_registerPrefixId =
52 m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
53 std::bind(&FullProducer::onInterest, this, _1, _2),
54 std::bind(&FullProducer::onRegisterFailed, this, _1, _2));
55
56 // Should we do this after setInterestFilter success call back
57 // (Currently following ChronoSync's way)
58 sendSyncInterest();
59}
60
61FullProducer::~FullProducer()
62{
63 if (m_outstandingInterestId != nullptr) {
64 m_face.removePendingInterest(m_outstandingInterestId);
65 }
66
67 m_face.unsetInterestFilter(m_registerPrefixId);
68}
69
70void
71FullProducer::publishName(const ndn::Name& prefix, ndn::optional<uint64_t> seq)
72{
73 if (m_prefixes.find(prefix) == m_prefixes.end()) {
74 NDN_LOG_WARN("Prefix not added: " << prefix);
75 return;
76 }
77
78 uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
79
80 NDN_LOG_INFO("Publish: "<< prefix << "/" << newSeq);
81
82 updateSeqNo(prefix, newSeq);
83
84 satisfyPendingInterests();
85}
86
87void
88FullProducer::sendSyncInterest()
89{
90 // If we send two sync interest one after the other
91 // since there is no new data in the network yet,
92 // when data is available it may satisfy both of them
93 if (m_outstandingInterestId != nullptr) {
94 m_face.removePendingInterest(m_outstandingInterestId);
95 }
96
97 // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
98 ndn::Name syncInterestName = m_syncPrefix;
99
100 // Append our latest IBF
101 m_iblt.appendToName(syncInterestName);
102
103 m_outstandingInterestName = syncInterestName;
104
105 m_scheduledSyncInterestId =
106 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
107 [this] { sendSyncInterest(); });
108
109 ndn::Interest syncInterest(syncInterestName);
110 syncInterest.setInterestLifetime(m_syncInterestLifetime);
111 // Other side appends hash of IBF to sync data name
112 syncInterest.setCanBePrefix(true);
113 syncInterest.setMustBeFresh(true);
114
115 syncInterest.setNonce(1);
116 syncInterest.refreshNonce();
117
118 m_outstandingInterestId = m_face.expressInterest(syncInterest,
119 std::bind(&FullProducer::onSyncData, this, _1, _2),
120 [] (const ndn::Interest& interest, const ndn::lp::Nack& nack) {
121 NDN_LOG_TRACE("received Nack with reason " << nack.getReason() <<
122 " for Interest with Nonce: " << interest.getNonce());
123 },
124 [] (const ndn::Interest& interest) {
125 NDN_LOG_DEBUG("On full sync timeout " << interest.getNonce());
126 });
127
128 NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
129 ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
130}
131
132void
133FullProducer::onInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
134{
135 ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
136 if (nameWithoutSyncPrefix.size() == 2 &&
137 nameWithoutSyncPrefix.get(nameWithoutSyncPrefix.size() - 1) == RECOVERY_PREFIX.get(0)) {
138 onRecoveryInterest(interest);
139 }
140 else if (nameWithoutSyncPrefix.size() == 1) {
141 onSyncInterest(interest);
142 }
143}
144
145void
146FullProducer::onSyncInterest(const ndn::Interest& interest)
147{
148 ndn::Name interestName = interest.getName();
149 ndn::name::Component ibltName = interestName.get(interestName.size()-1);
150
151 NDN_LOG_DEBUG("Full Sync Interest Received, nonce: " << interest.getNonce() <<
152 ", hash: " << std::hash<ndn::Name>{}(interestName));
153
154 IBLT iblt(m_expectedNumEntries);
155 try {
156 iblt.initialize(ibltName);
157 }
158 catch (const std::exception& e) {
159 NDN_LOG_WARN(e.what());
160 return;
161 }
162
163 IBLT diff = m_iblt - iblt;
164
165 std::set<uint32_t> positive;
166 std::set<uint32_t> negative;
167
168 if (!diff.listEntries(positive, negative)) {
169 NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
170 << " negative: " << negative.size() << " m_threshold: "
171 << m_threshold);
172
173 // Send nack if greater then threshold, else send positive below as usual
174 // Or send if we can't get neither positive nor negative differences
175 if (positive.size() + negative.size() >= m_threshold ||
176 (positive.size() == 0 && negative.size() == 0)) {
177
178 // If we don't have anything to offer means that
179 // we are behind and should not mislead other nodes.
180 bool haveSomethingToOffer = false;
181 for (const auto& content : m_prefixes) {
182 if (content.second != 0) {
183 haveSomethingToOffer = true;
184 }
185 }
186
187 if (haveSomethingToOffer) {
188 sendApplicationNack(interestName);
189 }
190 return;
191 }
192 }
193
194 State state;
195 for (const auto& hash : positive) {
196 ndn::Name prefix = m_hash2prefix[hash];
197 // Don't sync up sequence number zero
198 if (m_prefixes[prefix] != 0 && !isFutureHash(prefix.toUri(), negative)) {
199 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
200 }
201 }
202
203 if (!state.getContent().empty()) {
204 NDN_LOG_DEBUG("Sending sync content: " << state);
205 sendSyncData(interestName, state.wireEncode());
206 return;
207 }
208
209 ndn::util::scheduler::ScopedEventId scopedEventId(m_scheduler);
210 auto it = m_pendingEntries.emplace(interestName,
211 PendingEntryInfoFull{iblt, std::move(scopedEventId)});
212
213 it.first->second.expirationEvent =
214 m_scheduler.scheduleEvent(interest.getInterestLifetime(),
215 [this, interest] {
216 NDN_LOG_TRACE("Erase Pending Interest " << interest.getNonce());
217 m_pendingEntries.erase(interest.getName());
218 });
219}
220
221void
222FullProducer::onRecoveryInterest(const ndn::Interest& interest)
223{
224 NDN_LOG_DEBUG("Recovery interest received");
225
226 State state;
227 for (const auto& content : m_prefixes) {
228 if (content.second != 0) {
229 state.addContent(ndn::Name(content.first).appendNumber(content.second));
230 }
231 }
232
233 // Send even if state is empty to let other side know that we are behind
234 sendRecoveryData(interest.getName(), state);
235}
236
237void
238FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
239{
240 NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
241
242 ndn::Name nameWithIblt;
243 m_iblt.appendToName(nameWithIblt);
244
245 // Append hash of our IBF so that data name maybe different for each node answering
246 ndn::Data data(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
247 data.setFreshnessPeriod(m_syncReplyFreshness);
248 data.setContent(block);
249 m_keyChain.sign(data);
250
251 // checking if our own interest got satisfied
252 if (m_outstandingInterestName == name) {
253 NDN_LOG_DEBUG("Satisfied our own pending interest");
254 // remove outstanding interest
255 if (m_outstandingInterestId != nullptr) {
256 NDN_LOG_DEBUG("Removing our pending interest from face");
257 m_face.removePendingInterest(m_outstandingInterestId);
258 m_outstandingInterestId = nullptr;
259 m_outstandingInterestName = ndn::Name("");
260 }
261
262 NDN_LOG_DEBUG("Sending Sync Data");
263
264 // Send data after removing pending sync interest on face
265 m_face.put(data);
266
267 NDN_LOG_TRACE("Renewing sync interest");
268 sendSyncInterest();
269 }
270 else {
271 NDN_LOG_DEBUG("Sending Sync Data");
272 m_face.put(data);
273 }
274}
275
276void
277FullProducer::onSyncData(const ndn::Interest& interest, const ndn::Data& data)
278{
279 ndn::Name interestName = interest.getName();
280 deletePendingInterests(interest.getName());
281
282 if (data.getContentType() == ndn::tlv::ContentType_Nack) {
283 NDN_LOG_DEBUG("Got application nack, sending recovery interest");
284 sendRecoveryInterest(interest);
285 return;
286 }
287
288 State state(data.getContent());
289 std::vector<MissingDataInfo> updates;
290
291 if (interestName.get(interestName.size()-1) == RECOVERY_PREFIX.get(0) &&
292 state.getContent().empty()) {
293 NDN_LOG_TRACE("Recovery data is empty, other side is behind");
294 return;
295 }
296
297 NDN_LOG_DEBUG("Sync Data Received: " << state);
298
299 for (const auto& content : state.getContent()) {
300 ndn::Name prefix = content.getPrefix(-1);
301 uint64_t seq = content.get(content.size()-1).toNumber();
302
303 if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
304 updates.push_back(MissingDataInfo{prefix, m_prefixes[prefix] + 1, seq});
305 updateSeqNo(prefix, seq);
306 // We should not call satisfyPendingSyncInterests here because we just
307 // got data and deleted pending interest by calling deletePendingFullSyncInterests
308 // But we might have interests not matching to this interest that might not have deleted
309 // from pending sync interest
310 }
311 }
312
313 // We just got the data, so send a new sync interest
314 if (!updates.empty()) {
315 m_onUpdate(updates);
316 NDN_LOG_TRACE("Renewing sync interest");
317 sendSyncInterest();
318 }
319 else {
320 NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
321 " , hash: " << std::hash<ndn::Name>{}(interestName));
322 }
323}
324
325void
326FullProducer::satisfyPendingInterests()
327{
328 NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
329
330 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
331 const PendingEntryInfoFull& entry = it->second;
332 IBLT diff = m_iblt - entry.iblt;
333 std::set<uint32_t> positive;
334 std::set<uint32_t> negative;
335
336 if (!diff.listEntries(positive, negative)) {
337 NDN_LOG_TRACE("Decode failed for pending interest");
338 if (positive.size() + negative.size() >= m_threshold ||
339 (positive.size() == 0 && negative.size() == 0)) {
340 NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
341 m_pendingEntries.erase(it++);
342 continue;
343 }
344 }
345
346 State state;
347 for (const auto& hash : positive) {
348 ndn::Name prefix = m_hash2prefix[hash];
349
350 if (m_prefixes[prefix] != 0) {
351 state.addContent(ndn::Name(prefix).appendNumber(m_prefixes[prefix]));
352 }
353 }
354
355 if (!state.getContent().empty()) {
356 NDN_LOG_DEBUG("Satisfying sync content: " << state);
357 sendSyncData(it->first, state.wireEncode());
358 m_pendingEntries.erase(it++);
359 }
360 else {
361 ++it;
362 }
363 }
364}
365
366bool
367FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
368{
369 uint32_t nextHash = murmurHash3(N_HASHCHECK,
370 ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1).toUri());
371 for (const auto& nHash : negative) {
372 if (nHash == nextHash) {
373 return true;
374 break;
375 }
376 }
377 return false;
378}
379
380void
381FullProducer::deletePendingInterests(const ndn::Name& interestName) {
382 for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
383 if (it->first == interestName) {
384 NDN_LOG_TRACE("Delete pending interest: " << interestName);
385 m_pendingEntries.erase(it++);
386 }
387 else {
388 ++it;
389 }
390 }
391}
392
393void
394FullProducer::sendRecoveryData(const ndn::Name& prefix, const State& state)
395{
396 ndn::EncodingBuffer buffer;
397 buffer.prependBlock(state.wireEncode());
398
399 const uint8_t* rawBuffer = buffer.buf();
400 const uint8_t* segmentBegin = rawBuffer;
401 const uint8_t* end = rawBuffer + buffer.size();
402
403 uint64_t segmentNo = 0;
404 do {
405 const uint8_t* segmentEnd = segmentBegin + (ndn::MAX_NDN_PACKET_SIZE >> 1);
406 if (segmentEnd > end) {
407 segmentEnd = end;
408 }
409
410 ndn::Name segmentName(prefix);
411 segmentName.appendSegment(segmentNo);
412
413 std::shared_ptr<ndn::Data> data = std::make_shared<ndn::Data>(segmentName);
414 data->setContent(segmentBegin, segmentEnd - segmentBegin);
415 data->setFreshnessPeriod(m_syncReplyFreshness);
416
417 segmentBegin = segmentEnd;
418 if (segmentBegin >= end) {
419 data->setFinalBlock(segmentName[-1]);
420 }
421
422 m_keyChain.sign(*data);
423 m_face.put(*data);
424
425 NDN_LOG_DEBUG("Sending recovery data, seq: " << segmentNo);
426
427 ++segmentNo;
428 } while (segmentBegin < end);
429}
430
431void
432FullProducer::sendRecoveryInterest(const ndn::Interest& interest)
433{
434 if (m_outstandingInterestId != nullptr) {
435 m_face.removePendingInterest(m_outstandingInterestId);
436 m_outstandingInterestId = nullptr;
437 }
438
439 ndn::Name ibltName;
440 m_iblt.appendToName(ibltName);
441
442 ndn::Name recoveryInterestName(m_syncPrefix);
443 recoveryInterestName.appendNumber(std::hash<ndn::Name>{}(ibltName));
444 recoveryInterestName.append(RECOVERY_PREFIX);
445
446 ndn::Interest recoveryInterest(recoveryInterestName);
447 recoveryInterest.setInterestLifetime(m_syncInterestLifetime);
448
449 auto fetcher = ndn::util::SegmentFetcher::start(m_face,
450 recoveryInterest,
451 ndn::security::v2::getAcceptAllValidator());
452
453 fetcher->onComplete.connect([this, recoveryInterest] (ndn::ConstBufferPtr bufferPtr) {
454 NDN_LOG_TRACE("Segment fetcher got data");
455 ndn::Data data;
456 data.setContent(std::move(bufferPtr));
457 onSyncData(recoveryInterest, data);
458 });
459
460 fetcher->onError.connect([] (uint32_t errorCode, const std::string& msg) {
461 NDN_LOG_ERROR("Cannot recover, error: " << errorCode <<
462 " message: " << msg);
463 });
464}
465
466} // namespace psync