blob: 278115ef0f5226a6fcbe5e404fef35a416bdf6a6 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Sonu Mishra0dadc572016-12-12 23:59:41 -08003 * Copyright (c) 2012-2017 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
28
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080029INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070030
31#ifdef _DEBUG
32#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
33#else
34#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
35#endif
36
37namespace chronosync {
38
39using ndn::ConstBufferPtr;
40using ndn::EventId;
41
42const uint8_t EMPTY_DIGEST_VALUE[] = {
43 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
44 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
45 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
46 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
47};
48
49#ifdef _DEBUG
50int Logic::m_instanceCounter = 0;
51#endif
52
Yingdi Yucd339022014-11-05 17:51:19 -080053const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080054const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080055const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070056const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
57const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
58const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
60const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080061const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
Yingdi Yuf7ede412014-08-30 20:37:52 -070062
63const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
64const ndn::name::Component Logic::RESET_COMPONENT("reset");
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080065const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Yingdi Yuf7ede412014-08-30 20:37:52 -070066
67Logic::Logic(ndn::Face& face,
68 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080069 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070070 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080071 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080072 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070073 const time::steady_clock::Duration& resetTimer,
74 const time::steady_clock::Duration& cancelResetTimer,
75 const time::milliseconds& resetInterestLifetime,
76 const time::milliseconds& syncInterestLifetime,
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080077 const time::milliseconds& syncReplyFreshness,
78 const time::milliseconds& recoveryInterestLifetime)
Yingdi Yuf7ede412014-08-30 20:37:52 -070079 : m_face(face)
80 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080081 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070082 , m_interestTable(m_face.getIoService())
83 , m_outstandingInterestId(0)
84 , m_isInReset(false)
85 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
86 , m_onUpdate(onUpdate)
87 , m_scheduler(m_face.getIoService())
88 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
89 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
90 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
91 , m_resetTimer(resetTimer)
92 , m_cancelResetTimer(cancelResetTimer)
93 , m_resetInterestLifetime(resetInterestLifetime)
94 , m_syncInterestLifetime(syncInterestLifetime)
95 , m_syncReplyFreshness(syncReplyFreshness)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080096 , m_recoveryInterestLifetime(recoveryInterestLifetime)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080097 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080098 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070099{
100#ifdef _DEBUG
101 m_instanceId = m_instanceCounter++;
102#endif
103
104 _LOG_DEBUG_ID(">> Logic::Logic");
105
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800106 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
107
108
Yingdi Yuf7ede412014-08-30 20:37:52 -0700109 m_syncReset = m_syncPrefix;
110 m_syncReset.append("reset");
111
112 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
113 m_syncRegisteredPrefixId =
114 m_face.setInterestFilter(m_syncPrefix,
115 bind(&Logic::onSyncInterest, this, _1, _2),
116 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
117
Qiuhan Dinge246b622014-12-03 21:57:48 -0800118 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700119 _LOG_DEBUG_ID("<< Logic::Logic");
120}
121
122Logic::~Logic()
123{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700124 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800125 m_interestTable.clear();
126 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700127}
128
129void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800130Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700131{
132 m_isInReset = true;
133
134 m_state.reset();
135 m_log.clear();
136
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800137 if (!isOnInterest)
138 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700139
140 // reset outstanding interest name, so that data for previous interest will be dropped.
141 if (m_outstandingInterestId != 0) {
142 m_face.removePendingInterest(m_outstandingInterestId);
143 m_outstandingInterestId = 0;
144 }
145
146 sendSyncInterest();
147
148 if (static_cast<bool>(m_delayedInterestProcessingId))
149 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
150
151 m_delayedInterestProcessingId =
152 m_scheduler.scheduleEvent(m_cancelResetTimer,
153 bind(&Logic::cancelReset, this));
154}
155
156void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800157Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700158{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800159 if (defaultUserPrefix != EMPTY_NAME) {
160 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
161 m_defaultUserPrefix = defaultUserPrefix;
162 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
163 }
164 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700165}
166
167void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800168Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700169{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800170 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700171 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800172 if (m_defaultUserPrefix == EMPTY_NAME) {
173 m_defaultUserPrefix = userPrefix;
174 m_defaultSigningId = signingId;
175 }
176 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
177 m_nodeList[userPrefix].userPrefix = userPrefix;
178 m_nodeList[userPrefix].signingId = signingId;
179 Name sessionName = userPrefix;
180 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
181 m_nodeList[userPrefix].sessionName = sessionName;
182 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800183 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800184 }
185}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700186
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800187void
188Logic::removeUserNode(const Name& userPrefix)
189{
190 auto userNode = m_nodeList.find(userPrefix);
191 if (userNode != m_nodeList.end()) {
192 m_nodeList.erase(userNode);
193 if (m_defaultUserPrefix == userPrefix) {
194 if (!m_nodeList.empty()) {
195 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
196 m_defaultSigningId = m_nodeList.begin()->second.signingId;
197 }
198 else {
199 m_defaultUserPrefix = EMPTY_NAME;
200 m_defaultSigningId = DEFAULT_NAME;
201 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700202 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800203 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800204 }
205}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700206
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800207const Name&
208Logic::getSessionName(Name prefix)
209{
210 if (prefix == EMPTY_NAME)
211 prefix = m_defaultUserPrefix;
212 auto node = m_nodeList.find(prefix);
213 if (node != m_nodeList.end())
214 return node->second.sessionName;
215 else
216 throw Error("Refer to non-existent node:" + prefix.toUri());
217}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700218
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800219const SeqNo&
220Logic::getSeqNo(Name prefix)
221{
222 if (prefix == EMPTY_NAME)
223 prefix = m_defaultUserPrefix;
224 auto node = m_nodeList.find(prefix);
225 if (node != m_nodeList.end())
226 return node->second.seqNo;
227 else
228 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700229
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800230}
231
232void
233Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
234{
235 Name prefix;
236 if (updatePrefix == EMPTY_NAME) {
237 if (m_defaultUserPrefix == EMPTY_NAME)
238 return;
239 prefix = m_defaultUserPrefix;
240 }
241 else
242 prefix = updatePrefix;
243
244 auto it = m_nodeList.find(prefix);
245 if (it != m_nodeList.end()) {
246 NodeInfo& node = it->second;
247 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
248 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
249 if (seqNo < node.seqNo || seqNo == 0)
250 return;
251
252 node.seqNo = seqNo;
253 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
254
255 if (!m_isInReset) {
256 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
257 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
258 {
259 using namespace CryptoPP;
260
261 std::string hash;
262 StringSource(previousRoot->buf(), previousRoot->size(), true,
263 new HexEncoder(new StringSink(hash), false));
264 _LOG_DEBUG_ID("Hash: " << hash);
265 }
266
267 bool isInserted = false;
268 bool isUpdated = false;
269 SeqNo oldSeq;
270 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
271 node.seqNo);
272
273 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
274 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
275 if (isInserted || isUpdated) {
276 DiffStatePtr commit = make_shared<DiffState>();
277 commit->update(node.sessionName, node.seqNo);
278 commit->setRootDigest(m_state.getRootDigest());
279 insertToDiffLog(commit, previousRoot);
280
281 satisfyPendingSyncInterests(prefix, commit);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800282 formAndSendExcludeInterest(prefix, *commit, previousRoot);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800283 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700284 }
285 }
286}
287
288ConstBufferPtr
289Logic::getRootDigest() const
290{
291 return m_state.getRootDigest();
292}
293
294void
295Logic::printState(std::ostream& os) const
296{
297 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
298 {
299 os << *leaf << "\n";
300 }
301}
302
303std::set<Name>
304Logic::getSessionNames() const
305{
306 std::set<Name> sessionNames;
307
308 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
309 {
310 sessionNames.insert(leaf->getSessionName());
311 }
312
313 return sessionNames;
314}
315
316void
317Logic::onSyncInterest(const Name& prefix, const Interest& interest)
318{
319 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
320 Name name = interest.getName();
321
322 _LOG_DEBUG_ID("InterestName: " << name);
323
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800324 if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
325 processResetInterest(interest);
326 }
327 else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
328 processRecoveryInterest(interest);
329 }
330 else {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700331 processSyncInterest(interest.shared_from_this());
332 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700333
334 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
335}
336
337void
338Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
339{
340 //Sync prefix registration failed
341 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
342}
343
344void
345Logic::onSyncData(const Interest& interest, Data& data)
346{
347 _LOG_DEBUG_ID(">> Logic::onSyncData");
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800348 // if (static_cast<bool>(m_validator))
349 // m_validator->validate(data,
350 // bind(&Logic::onSyncDataValidated, this, _1),
351 // bind(&Logic::onSyncDataValidationFailed, this, _1));
352 // else
353 // onSyncDataValidated(data.shared_from_this());
354
355 if (interest.getExclude().empty()) {
356 _LOG_DEBUG_ID("First data");
Yingdi Yucd339022014-11-05 17:51:19 -0800357 onSyncDataValidated(data.shared_from_this());
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800358 }
359 else {
360 _LOG_DEBUG_ID("Data obtained using exclude filter");
361 onSyncDataValidated(data.shared_from_this(), false);
362 }
363 sendExcludeInterest(interest, data);
364
Yingdi Yuf7ede412014-08-30 20:37:52 -0700365 _LOG_DEBUG_ID("<< Logic::onSyncData");
366}
367
368void
369Logic::onResetData(const Interest& interest, Data& data)
370{
371 // This should not happened, drop the received data.
372}
373
374void
375Logic::onSyncTimeout(const Interest& interest)
376{
377 // It is OK. Others will handle the time out situation.
378 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
379 _LOG_DEBUG_ID("Interest: " << interest.getName());
380 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
381}
382
383void
384Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
385{
386 // SyncReply cannot be validated.
387}
388
389void
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800390Logic::onSyncDataValidated(const shared_ptr<const Data>& data, bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700391{
392 Name name = data->getName();
393 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
394
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800395 processSyncData(name, digest, data->getContent().blockFromValue(), firstData);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700396}
397
398void
399Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
400 bool isTimedProcessing/*=false*/)
401{
402 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
403
404 const Name& name = interest->getName();
405 ConstBufferPtr digest =
406 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
407
408 ConstBufferPtr rootDigest = m_state.getRootDigest();
409
410 // If the digest of the incoming interest is the same as root digest
411 // Put the interest into InterestTable
412 if (*rootDigest == *digest) {
413 _LOG_DEBUG_ID("Oh, we are in the same state");
414 m_interestTable.insert(interest, digest, false);
415
416 if (!m_isInReset)
417 return;
418
419 if (!isTimedProcessing) {
420 _LOG_DEBUG_ID("Non timed processing in reset");
421 // Still in reset, our own seq has not been put into state yet
422 // Do not hurry, some others may be also resetting and may send their reply
423 if (static_cast<bool>(m_delayedInterestProcessingId))
424 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
425
426 time::milliseconds after(m_rangeUniformRandom());
427 _LOG_DEBUG_ID("After: " << after);
428 m_delayedInterestProcessingId =
429 m_scheduler.scheduleEvent(after,
430 bind(&Logic::processSyncInterest, this, interest, true));
431 }
432 else {
433 _LOG_DEBUG_ID("Timed processing in reset");
434 // Now we can get out of reset state by putting our own stuff into m_state.
435 cancelReset();
436 }
437
438 return;
439 }
440
441 // If the digest of incoming interest is an "empty" digest
Sonu Mishrae10acbc2017-01-18 14:14:05 -0800442 if (*digest == *EMPTY_DIGEST) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700443 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800444 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700445 return;
446 }
447
448 DiffStateContainer::iterator stateIter = m_log.find(digest);
449 // If the digest of incoming interest can be found from the log
450 if (stateIter != m_log.end()) {
451 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800452 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700453 return;
454 }
455
456 if (!isTimedProcessing) {
457 _LOG_DEBUG_ID("Let's wait, just wait for a while");
458 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800459 bool doesExist = m_interestTable.has(digest);
460 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700461 if (doesExist)
462 // Original comment (not sure): somebody else replied, so restart random-game timer
463 // YY: Get the same SyncInterest again, refresh the timer
464 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
465
466 m_delayedInterestProcessingId =
467 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
468 bind(&Logic::processSyncInterest, this, interest, true));
469 }
470 else {
471 // OK, nobody is helping us, just tell the truth.
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800472 _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700473 m_interestTable.erase(digest);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800474 sendRecoveryInterest(digest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700475 }
476
477 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
478}
479
480void
481Logic::processResetInterest(const Interest& interest)
482{
483 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800484 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700485}
486
487void
488Logic::processSyncData(const Name& name,
489 ndn::ConstBufferPtr digest,
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800490 const Block& syncReplyBlock,
491 bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700492{
493 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700494 DiffStatePtr commit = make_shared<DiffState>();
495 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
496
497 try {
498 m_interestTable.erase(digest); // Remove satisfied interest from PIT
499
500 State reply;
501 reply.wireDecode(syncReplyBlock);
502
503 std::vector<MissingDataInfo> v;
504 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
505 {
506 BOOST_ASSERT(leaf != 0);
507
508 const Name& info = leaf->getSessionName();
509 SeqNo seq = leaf->getSeq();
510
511 bool isInserted = false;
512 bool isUpdated = false;
513 SeqNo oldSeq;
514 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700515 if (isInserted || isUpdated) {
516 commit->update(info, seq);
517
518 oldSeq++;
519 MissingDataInfo mdi = {info, oldSeq, seq};
520 v.push_back(mdi);
521 }
522 }
523
524 if (!v.empty()) {
525 m_onUpdate(v);
526
527 commit->setRootDigest(m_state.getRootDigest());
528 insertToDiffLog(commit, previousRoot);
529 }
530 else {
531 _LOG_DEBUG_ID("What? nothing new");
532 }
533 }
534 catch (State::Error&) {
535 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
536 // Something really fishy happened during state decoding;
537 commit.reset();
538 return;
539 }
540
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800541 if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700542 // state changed and it is safe to express a new interest
543 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
544 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
545 EventId eventId = m_scheduler.scheduleEvent(after,
546 bind(&Logic::sendSyncInterest, this));
547
548 m_scheduler.cancelEvent(m_reexpressingInterestId);
549 m_reexpressingInterestId = eventId;
550 }
551}
552
553void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800554Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700555{
556 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
557 try {
558 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
559 for (InterestTable::const_iterator it = m_interestTable.begin();
560 it != m_interestTable.end(); it++) {
561 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700562 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800563 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700564 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800565 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700566 }
567 m_interestTable.clear();
568 }
569 catch (InterestTable::Error&) {
570 // ok. not really an error
571 }
572 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
573}
574
575void
576Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
577{
578 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
579 // Connect to the history
580 if (!m_log.empty())
581 (*m_log.find(previousRoot))->setNext(commit);
582
583 // Insert the commit
584 m_log.erase(commit->getRootDigest());
585 m_log.insert(commit);
586 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
587}
588
589void
590Logic::sendResetInterest()
591{
592 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
593
594 if (m_needPeriodReset) {
595 _LOG_DEBUG_ID("Need Period Reset");
596 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
597
598 EventId eventId =
599 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
600 bind(&Logic::sendResetInterest, this));
601 m_scheduler.cancelEvent(m_resetInterestId);
602 m_resetInterestId = eventId;
603 }
604
605 Interest interest(m_syncReset);
606 interest.setMustBeFresh(true);
607 interest.setInterestLifetime(m_resetInterestLifetime);
608 m_face.expressInterest(interest,
609 bind(&Logic::onResetData, this, _1, _2),
610 bind(&Logic::onSyncTimeout, this, _1));
611
612 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
613}
614
615void
616Logic::sendSyncInterest()
617{
618 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
619
620 Name interestName;
621 interestName.append(m_syncPrefix)
622 .append(ndn::name::Component(*m_state.getRootDigest()));
623
624 m_outstandingInterestName = interestName;
625
626#ifdef _DEBUG
627 printDigest(m_state.getRootDigest());
628#endif
629
630 EventId eventId =
Sonu Mishra0dadc572016-12-12 23:59:41 -0800631 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
Yingdi Yuf7ede412014-08-30 20:37:52 -0700632 ndn::time::milliseconds(m_reexpressionJitter()),
633 bind(&Logic::sendSyncInterest, this));
634 m_scheduler.cancelEvent(m_reexpressingInterestId);
635 m_reexpressingInterestId = eventId;
636
637 Interest interest(interestName);
638 interest.setMustBeFresh(true);
639 interest.setInterestLifetime(m_syncInterestLifetime);
640
641 m_outstandingInterestId = m_face.expressInterest(interest,
642 bind(&Logic::onSyncData, this, _1, _2),
643 bind(&Logic::onSyncTimeout, this, _1));
644
645 _LOG_DEBUG_ID("Send interest: " << interest.getName());
646 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
647}
648
649void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800650Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700651{
652 _LOG_DEBUG_ID(">> Logic::sendSyncData");
653 shared_ptr<Data> syncReply = make_shared<Data>(name);
654 syncReply->setContent(state.wireEncode());
655 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800656 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
657 return;
658 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800659 m_keyChain.sign(*syncReply);
660 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800661 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700662
663 m_face.put(*syncReply);
664
665 // checking if our own interest got satisfied
666 if (m_outstandingInterestName == name) {
667 // remove outstanding interest
668 if (m_outstandingInterestId != 0) {
669 m_face.removePendingInterest(m_outstandingInterestId);
670 m_outstandingInterestId = 0;
671 }
672
673 // re-schedule sending Sync interest
674 time::milliseconds after(m_reexpressionJitter());
675 _LOG_DEBUG_ID("Satisfy our own interest");
676 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
677 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
678 m_scheduler.cancelEvent(m_reexpressingInterestId);
679 m_reexpressingInterestId = eventId;
680 }
681 _LOG_DEBUG_ID("<< Logic::sendSyncData");
682}
683
684void
685Logic::cancelReset()
686{
687 _LOG_DEBUG_ID(">> Logic::cancelReset");
688 if (!m_isInReset)
689 return;
690
691 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800692 for (const auto& node : m_nodeList) {
693 updateSeqNo(node.second.seqNo, node.first);
694 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700695 _LOG_DEBUG_ID("<< Logic::cancelReset");
696}
697
698void
699Logic::printDigest(ndn::ConstBufferPtr digest)
700{
701 using namespace CryptoPP;
702
703 std::string hash;
704 StringSource(digest->buf(), digest->size(), true,
705 new HexEncoder(new StringSink(hash), false));
706 _LOG_DEBUG_ID("Hash: " << hash);
707}
708
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800709void
710Logic::sendRecoveryInterest(ndn::ConstBufferPtr digest)
711{
712 _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
713
714 Name interestName;
715 interestName.append(m_syncPrefix)
716 .append(RECOVERY_COMPONENT)
717 .append(ndn::name::Component(*digest));
718
719 Interest interest(interestName);
720 interest.setMustBeFresh(true);
721 interest.setInterestLifetime(m_recoveryInterestLifetime);
722
723 m_face.expressInterest(interest, bind(&Logic::onRecoveryData, this, _1, _2),
724 bind(&Logic::onRecoveryTimeout, this, _1));
725
726 _LOG_DEBUG_ID("interest: " << interest.getName());
727 _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
728}
729
730void
731Logic::processRecoveryInterest(const Interest& interest)
732{
733 _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
734
735 const Name& name = interest.getName();
736 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
737
738 ConstBufferPtr rootDigest = m_state.getRootDigest();
739
740 DiffStateContainer::iterator stateIter = m_log.find(digest);
741
742 if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
743 _LOG_DEBUG_ID("I can help you recover");
744 sendSyncData(m_defaultUserPrefix, name, m_state);
745 return;
746 }
747 _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
748}
749
750void
751Logic::onRecoveryData(const Interest& interest, Data& data)
752{
753 _LOG_DEBUG_ID(">> Logic::onRecoveryData");
754 onSyncDataValidated(data.shared_from_this());
755 _LOG_DEBUG_ID("<< Logic::onRecoveryData");
756}
757
758void
759Logic::onRecoveryTimeout(const Interest& interest)
760{
761 _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
762 _LOG_DEBUG_ID("Interest: " << interest.getName());
763 _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
764}
765
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800766void
767Logic::sendExcludeInterest(const Interest& interest, const Data& data)
768{
769 _LOG_DEBUG_ID(">> Logic::sendExcludeInterest");
770
771 Name interestName = interest.getName();
772 Interest excludeInterest(interestName);
773
774 Exclude exclude = interest.getExclude();
775 exclude.excludeOne(data.getFullName().get(-1));
776 excludeInterest.setExclude(exclude);
777
778 excludeInterest.setInterestLifetime(m_syncInterestLifetime);
779
780 m_face.expressInterest(excludeInterest, bind(&Logic::onSyncData, this, _1, _2),
781 bind(&Logic::onSyncTimeout, this, _1));
782
783 _LOG_DEBUG_ID("Send interest: " << excludeInterest.getName());
784 _LOG_DEBUG_ID("<< Logic::sendExcludeInterest");
785}
786
787void
788Logic::formAndSendExcludeInterest(const Name& nodePrefix, const State& commit, ndn::ConstBufferPtr previousRoot)
789{
790 _LOG_DEBUG_ID(">> Logic::formAndSendExcludeInterest");
791 Name interestName;
792 interestName.append(m_syncPrefix)
793 .append(ndn::name::Component(*previousRoot));
794 Interest interest(interestName);
795
796 shared_ptr<Data> data = make_shared<Data>(interestName);
797 data->setContent(commit.wireEncode());
798 data->setFreshnessPeriod(m_syncReplyFreshness);
799 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
800 return;
801 if (m_nodeList[nodePrefix].signingId.empty())
802 m_keyChain.sign(*data);
803 else
804 m_keyChain.signByIdentity(*data, m_nodeList[nodePrefix].signingId);
805
806 sendExcludeInterest(interest, *data);
807
808 _LOG_DEBUG_ID("<< Logic::formAndSendExcludeInterest");
809}
810
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800811} // namespace chronosync