blob: d3e5f11febc55a66464cfdb3ada84aa36c409486 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Sonu Mishra0dadc572016-12-12 23:59:41 -08003 * Copyright (c) 2012-2017 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
28
Ashlesh Gawande687cf922017-05-30 15:04:16 -050029#include <ndn-cxx/util/string-helper.hpp>
30
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080031INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070032
33#ifdef _DEBUG
34#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
35#else
36#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
37#endif
38
39namespace chronosync {
40
41using ndn::ConstBufferPtr;
42using ndn::EventId;
43
44const uint8_t EMPTY_DIGEST_VALUE[] = {
45 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
46 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
47 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
48 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
49};
50
51#ifdef _DEBUG
52int Logic::m_instanceCounter = 0;
53#endif
54
Yingdi Yucd339022014-11-05 17:51:19 -080055const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080056const ndn::Name Logic::EMPTY_NAME;
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -080057const std::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070058const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
59const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
60const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
61const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
62const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080063const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
Yingdi Yuf7ede412014-08-30 20:37:52 -070064
65const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
66const ndn::name::Component Logic::RESET_COMPONENT("reset");
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080067const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Yingdi Yuf7ede412014-08-30 20:37:52 -070068
69Logic::Logic(ndn::Face& face,
70 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080071 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070072 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080073 const Name& defaultSigningId,
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -080074 std::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070075 const time::steady_clock::Duration& resetTimer,
76 const time::steady_clock::Duration& cancelResetTimer,
77 const time::milliseconds& resetInterestLifetime,
78 const time::milliseconds& syncInterestLifetime,
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080079 const time::milliseconds& syncReplyFreshness,
80 const time::milliseconds& recoveryInterestLifetime)
Yingdi Yuf7ede412014-08-30 20:37:52 -070081 : m_face(face)
82 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080083 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070084 , m_interestTable(m_face.getIoService())
85 , m_outstandingInterestId(0)
86 , m_isInReset(false)
87 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
88 , m_onUpdate(onUpdate)
89 , m_scheduler(m_face.getIoService())
90 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
91 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
92 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
93 , m_resetTimer(resetTimer)
94 , m_cancelResetTimer(cancelResetTimer)
95 , m_resetInterestLifetime(resetInterestLifetime)
96 , m_syncInterestLifetime(syncInterestLifetime)
97 , m_syncReplyFreshness(syncReplyFreshness)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080098 , m_recoveryInterestLifetime(recoveryInterestLifetime)
Yingdi Yucd339022014-11-05 17:51:19 -080099 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700100{
101#ifdef _DEBUG
102 m_instanceId = m_instanceCounter++;
103#endif
104
105 _LOG_DEBUG_ID(">> Logic::Logic");
106
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500107 addUserNode(m_defaultUserPrefix, defaultSigningId);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800108
109
Yingdi Yuf7ede412014-08-30 20:37:52 -0700110 m_syncReset = m_syncPrefix;
111 m_syncReset.append("reset");
112
113 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
114 m_syncRegisteredPrefixId =
115 m_face.setInterestFilter(m_syncPrefix,
116 bind(&Logic::onSyncInterest, this, _1, _2),
117 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
118
Qiuhan Dinge246b622014-12-03 21:57:48 -0800119 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700120 _LOG_DEBUG_ID("<< Logic::Logic");
121}
122
123Logic::~Logic()
124{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700125 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800126 m_interestTable.clear();
127 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700128}
129
130void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800131Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700132{
133 m_isInReset = true;
134
135 m_state.reset();
136 m_log.clear();
137
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800138 if (!isOnInterest)
139 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700140
141 // reset outstanding interest name, so that data for previous interest will be dropped.
142 if (m_outstandingInterestId != 0) {
143 m_face.removePendingInterest(m_outstandingInterestId);
144 m_outstandingInterestId = 0;
145 }
146
147 sendSyncInterest();
148
149 if (static_cast<bool>(m_delayedInterestProcessingId))
150 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
151
152 m_delayedInterestProcessingId =
153 m_scheduler.scheduleEvent(m_cancelResetTimer,
154 bind(&Logic::cancelReset, this));
155}
156
157void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800158Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700159{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800160 if (defaultUserPrefix != EMPTY_NAME) {
161 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
162 m_defaultUserPrefix = defaultUserPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800163 }
164 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700165}
166
167void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800168Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700169{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800170 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700171 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800172 if (m_defaultUserPrefix == EMPTY_NAME) {
173 m_defaultUserPrefix = userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800174 }
175 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
176 m_nodeList[userPrefix].userPrefix = userPrefix;
177 m_nodeList[userPrefix].signingId = signingId;
178 Name sessionName = userPrefix;
179 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
180 m_nodeList[userPrefix].sessionName = sessionName;
181 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800182 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800183 }
184}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700185
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800186void
187Logic::removeUserNode(const Name& userPrefix)
188{
189 auto userNode = m_nodeList.find(userPrefix);
190 if (userNode != m_nodeList.end()) {
191 m_nodeList.erase(userNode);
192 if (m_defaultUserPrefix == userPrefix) {
193 if (!m_nodeList.empty()) {
194 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800195 }
196 else {
197 m_defaultUserPrefix = EMPTY_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800198 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700199 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800200 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800201 }
202}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700203
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800204const Name&
205Logic::getSessionName(Name prefix)
206{
207 if (prefix == EMPTY_NAME)
208 prefix = m_defaultUserPrefix;
209 auto node = m_nodeList.find(prefix);
210 if (node != m_nodeList.end())
211 return node->second.sessionName;
212 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800213 BOOST_THROW_EXCEPTION(Error("Refer to non-existent node:" + prefix.toUri()));
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800214}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700215
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800216const SeqNo&
217Logic::getSeqNo(Name prefix)
218{
219 if (prefix == EMPTY_NAME)
220 prefix = m_defaultUserPrefix;
221 auto node = m_nodeList.find(prefix);
222 if (node != m_nodeList.end())
223 return node->second.seqNo;
224 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800225 BOOST_THROW_EXCEPTION(Logic::Error("Refer to non-existent node:" + prefix.toUri()));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700226
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800227}
228
229void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800230Logic::updateSeqNo(const SeqNo& seqNo, const Name& updatePrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800231{
232 Name prefix;
233 if (updatePrefix == EMPTY_NAME) {
234 if (m_defaultUserPrefix == EMPTY_NAME)
235 return;
236 prefix = m_defaultUserPrefix;
237 }
238 else
239 prefix = updatePrefix;
240
241 auto it = m_nodeList.find(prefix);
242 if (it != m_nodeList.end()) {
243 NodeInfo& node = it->second;
244 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
245 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
246 if (seqNo < node.seqNo || seqNo == 0)
247 return;
248
249 node.seqNo = seqNo;
250 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
251
252 if (!m_isInReset) {
253 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
254 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
255 {
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500256 std::string hash = ndn::toHex(previousRoot->buf(), previousRoot->size(), false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800257 _LOG_DEBUG_ID("Hash: " << hash);
258 }
259
260 bool isInserted = false;
261 bool isUpdated = false;
262 SeqNo oldSeq;
263 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
264 node.seqNo);
265
266 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
267 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
268 if (isInserted || isUpdated) {
269 DiffStatePtr commit = make_shared<DiffState>();
270 commit->update(node.sessionName, node.seqNo);
271 commit->setRootDigest(m_state.getRootDigest());
272 insertToDiffLog(commit, previousRoot);
273
274 satisfyPendingSyncInterests(prefix, commit);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800275 formAndSendExcludeInterest(prefix, *commit, previousRoot);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800276 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700277 }
278 }
279}
280
281ConstBufferPtr
282Logic::getRootDigest() const
283{
284 return m_state.getRootDigest();
285}
286
287void
288Logic::printState(std::ostream& os) const
289{
290 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
291 {
292 os << *leaf << "\n";
293 }
294}
295
296std::set<Name>
297Logic::getSessionNames() const
298{
299 std::set<Name> sessionNames;
300
301 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
302 {
303 sessionNames.insert(leaf->getSessionName());
304 }
305
306 return sessionNames;
307}
308
309void
310Logic::onSyncInterest(const Name& prefix, const Interest& interest)
311{
312 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
313 Name name = interest.getName();
314
315 _LOG_DEBUG_ID("InterestName: " << name);
316
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800317 if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
318 processResetInterest(interest);
319 }
320 else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
321 processRecoveryInterest(interest);
322 }
Ashlesh Gawande8ba7d5a2017-07-24 14:43:12 -0500323 // Do not process exclude interests, they should be answered by CS
324 else if (interest.getExclude().empty()) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700325 processSyncInterest(interest.shared_from_this());
326 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700327
328 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
329}
330
331void
332Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
333{
334 //Sync prefix registration failed
335 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
336}
337
338void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800339Logic::onSyncData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700340{
341 _LOG_DEBUG_ID(">> Logic::onSyncData");
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800342 // if (static_cast<bool>(m_validator))
343 // m_validator->validate(data,
344 // bind(&Logic::onSyncDataValidated, this, _1),
345 // bind(&Logic::onSyncDataValidationFailed, this, _1));
346 // else
347 // onSyncDataValidated(data.shared_from_this());
348
349 if (interest.getExclude().empty()) {
350 _LOG_DEBUG_ID("First data");
Yingdi Yucd339022014-11-05 17:51:19 -0800351 onSyncDataValidated(data.shared_from_this());
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800352 }
353 else {
354 _LOG_DEBUG_ID("Data obtained using exclude filter");
355 onSyncDataValidated(data.shared_from_this(), false);
356 }
357 sendExcludeInterest(interest, data);
358
Yingdi Yuf7ede412014-08-30 20:37:52 -0700359 _LOG_DEBUG_ID("<< Logic::onSyncData");
360}
361
362void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800363Logic::onResetData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700364{
365 // This should not happened, drop the received data.
366}
367
368void
369Logic::onSyncTimeout(const Interest& interest)
370{
371 // It is OK. Others will handle the time out situation.
372 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
373 _LOG_DEBUG_ID("Interest: " << interest.getName());
374 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
375}
376
377void
378Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
379{
380 // SyncReply cannot be validated.
381}
382
383void
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800384Logic::onSyncDataValidated(const shared_ptr<const Data>& data, bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700385{
386 Name name = data->getName();
387 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
388
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800389 processSyncData(name, digest, data->getContent().blockFromValue(), firstData);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700390}
391
392void
393Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
394 bool isTimedProcessing/*=false*/)
395{
396 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
397
398 const Name& name = interest->getName();
399 ConstBufferPtr digest =
400 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
401
402 ConstBufferPtr rootDigest = m_state.getRootDigest();
403
404 // If the digest of the incoming interest is the same as root digest
405 // Put the interest into InterestTable
406 if (*rootDigest == *digest) {
407 _LOG_DEBUG_ID("Oh, we are in the same state");
408 m_interestTable.insert(interest, digest, false);
409
410 if (!m_isInReset)
411 return;
412
413 if (!isTimedProcessing) {
414 _LOG_DEBUG_ID("Non timed processing in reset");
415 // Still in reset, our own seq has not been put into state yet
416 // Do not hurry, some others may be also resetting and may send their reply
417 if (static_cast<bool>(m_delayedInterestProcessingId))
418 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
419
420 time::milliseconds after(m_rangeUniformRandom());
421 _LOG_DEBUG_ID("After: " << after);
422 m_delayedInterestProcessingId =
423 m_scheduler.scheduleEvent(after,
424 bind(&Logic::processSyncInterest, this, interest, true));
425 }
426 else {
427 _LOG_DEBUG_ID("Timed processing in reset");
428 // Now we can get out of reset state by putting our own stuff into m_state.
429 cancelReset();
430 }
431
432 return;
433 }
434
435 // If the digest of incoming interest is an "empty" digest
Sonu Mishrae10acbc2017-01-18 14:14:05 -0800436 if (*digest == *EMPTY_DIGEST) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700437 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800438 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700439 return;
440 }
441
442 DiffStateContainer::iterator stateIter = m_log.find(digest);
443 // If the digest of incoming interest can be found from the log
444 if (stateIter != m_log.end()) {
445 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800446 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700447 return;
448 }
449
450 if (!isTimedProcessing) {
451 _LOG_DEBUG_ID("Let's wait, just wait for a while");
452 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800453 bool doesExist = m_interestTable.has(digest);
454 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700455 if (doesExist)
456 // Original comment (not sure): somebody else replied, so restart random-game timer
457 // YY: Get the same SyncInterest again, refresh the timer
458 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
459
460 m_delayedInterestProcessingId =
461 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
462 bind(&Logic::processSyncInterest, this, interest, true));
463 }
464 else {
465 // OK, nobody is helping us, just tell the truth.
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800466 _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700467 m_interestTable.erase(digest);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800468 sendRecoveryInterest(digest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700469 }
470
471 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
472}
473
474void
475Logic::processResetInterest(const Interest& interest)
476{
477 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800478 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700479}
480
481void
482Logic::processSyncData(const Name& name,
483 ndn::ConstBufferPtr digest,
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800484 const Block& syncReplyBlock,
485 bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700486{
487 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700488 DiffStatePtr commit = make_shared<DiffState>();
489 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
490
491 try {
492 m_interestTable.erase(digest); // Remove satisfied interest from PIT
493
494 State reply;
495 reply.wireDecode(syncReplyBlock);
496
497 std::vector<MissingDataInfo> v;
498 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
499 {
500 BOOST_ASSERT(leaf != 0);
501
502 const Name& info = leaf->getSessionName();
503 SeqNo seq = leaf->getSeq();
504
505 bool isInserted = false;
506 bool isUpdated = false;
507 SeqNo oldSeq;
508 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700509 if (isInserted || isUpdated) {
510 commit->update(info, seq);
511
512 oldSeq++;
513 MissingDataInfo mdi = {info, oldSeq, seq};
514 v.push_back(mdi);
515 }
516 }
517
518 if (!v.empty()) {
519 m_onUpdate(v);
520
521 commit->setRootDigest(m_state.getRootDigest());
522 insertToDiffLog(commit, previousRoot);
523 }
524 else {
525 _LOG_DEBUG_ID("What? nothing new");
526 }
527 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800528 catch (const State::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700529 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
530 // Something really fishy happened during state decoding;
531 commit.reset();
532 return;
533 }
534
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800535 if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700536 // state changed and it is safe to express a new interest
537 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
538 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
539 EventId eventId = m_scheduler.scheduleEvent(after,
540 bind(&Logic::sendSyncInterest, this));
541
542 m_scheduler.cancelEvent(m_reexpressingInterestId);
543 m_reexpressingInterestId = eventId;
544 }
545}
546
547void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800548Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700549{
550 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
551 try {
552 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
553 for (InterestTable::const_iterator it = m_interestTable.begin();
554 it != m_interestTable.end(); it++) {
555 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700556 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800557 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700558 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800559 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700560 }
561 m_interestTable.clear();
562 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800563 catch (const InterestTable::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700564 // ok. not really an error
565 }
566 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
567}
568
569void
570Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
571{
572 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
573 // Connect to the history
574 if (!m_log.empty())
575 (*m_log.find(previousRoot))->setNext(commit);
576
577 // Insert the commit
578 m_log.erase(commit->getRootDigest());
579 m_log.insert(commit);
580 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
581}
582
583void
584Logic::sendResetInterest()
585{
586 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
587
588 if (m_needPeriodReset) {
589 _LOG_DEBUG_ID("Need Period Reset");
590 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
591
592 EventId eventId =
593 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
594 bind(&Logic::sendResetInterest, this));
595 m_scheduler.cancelEvent(m_resetInterestId);
596 m_resetInterestId = eventId;
597 }
598
599 Interest interest(m_syncReset);
600 interest.setMustBeFresh(true);
601 interest.setInterestLifetime(m_resetInterestLifetime);
602 m_face.expressInterest(interest,
603 bind(&Logic::onResetData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800604 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700605 bind(&Logic::onSyncTimeout, this, _1));
606
607 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
608}
609
610void
611Logic::sendSyncInterest()
612{
613 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
614
615 Name interestName;
616 interestName.append(m_syncPrefix)
617 .append(ndn::name::Component(*m_state.getRootDigest()));
618
619 m_outstandingInterestName = interestName;
620
621#ifdef _DEBUG
622 printDigest(m_state.getRootDigest());
623#endif
624
625 EventId eventId =
Sonu Mishra0dadc572016-12-12 23:59:41 -0800626 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
Yingdi Yuf7ede412014-08-30 20:37:52 -0700627 ndn::time::milliseconds(m_reexpressionJitter()),
628 bind(&Logic::sendSyncInterest, this));
629 m_scheduler.cancelEvent(m_reexpressingInterestId);
630 m_reexpressingInterestId = eventId;
631
632 Interest interest(interestName);
633 interest.setMustBeFresh(true);
634 interest.setInterestLifetime(m_syncInterestLifetime);
635
636 m_outstandingInterestId = m_face.expressInterest(interest,
637 bind(&Logic::onSyncData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800638 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700639 bind(&Logic::onSyncTimeout, this, _1));
640
641 _LOG_DEBUG_ID("Send interest: " << interest.getName());
642 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
643}
644
645void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800646Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700647{
648 _LOG_DEBUG_ID(">> Logic::sendSyncData");
649 shared_ptr<Data> syncReply = make_shared<Data>(name);
650 syncReply->setContent(state.wireEncode());
651 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800652 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
653 return;
654 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800655 m_keyChain.sign(*syncReply);
656 else
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500657 m_keyChain.sign(*syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700658
659 m_face.put(*syncReply);
660
661 // checking if our own interest got satisfied
662 if (m_outstandingInterestName == name) {
663 // remove outstanding interest
664 if (m_outstandingInterestId != 0) {
665 m_face.removePendingInterest(m_outstandingInterestId);
666 m_outstandingInterestId = 0;
667 }
668
669 // re-schedule sending Sync interest
670 time::milliseconds after(m_reexpressionJitter());
671 _LOG_DEBUG_ID("Satisfy our own interest");
672 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
673 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
674 m_scheduler.cancelEvent(m_reexpressingInterestId);
675 m_reexpressingInterestId = eventId;
676 }
677 _LOG_DEBUG_ID("<< Logic::sendSyncData");
678}
679
680void
681Logic::cancelReset()
682{
683 _LOG_DEBUG_ID(">> Logic::cancelReset");
684 if (!m_isInReset)
685 return;
686
687 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800688 for (const auto& node : m_nodeList) {
689 updateSeqNo(node.second.seqNo, node.first);
690 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700691 _LOG_DEBUG_ID("<< Logic::cancelReset");
692}
693
694void
695Logic::printDigest(ndn::ConstBufferPtr digest)
696{
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500697 std::string hash = ndn::toHex(digest->buf(), digest->size(), false);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700698 _LOG_DEBUG_ID("Hash: " << hash);
699}
700
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800701void
702Logic::sendRecoveryInterest(ndn::ConstBufferPtr digest)
703{
704 _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
705
706 Name interestName;
707 interestName.append(m_syncPrefix)
708 .append(RECOVERY_COMPONENT)
709 .append(ndn::name::Component(*digest));
710
711 Interest interest(interestName);
712 interest.setMustBeFresh(true);
713 interest.setInterestLifetime(m_recoveryInterestLifetime);
714
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800715 m_face.expressInterest(interest,
716 bind(&Logic::onRecoveryData, this, _1, _2),
717 bind(&Logic::onRecoveryTimeout, this, _1), // Nack
718 bind(&Logic::onRecoveryTimeout, this, _1));
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800719
720 _LOG_DEBUG_ID("interest: " << interest.getName());
721 _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
722}
723
724void
725Logic::processRecoveryInterest(const Interest& interest)
726{
727 _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
728
729 const Name& name = interest.getName();
730 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
731
732 ConstBufferPtr rootDigest = m_state.getRootDigest();
733
734 DiffStateContainer::iterator stateIter = m_log.find(digest);
735
736 if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
737 _LOG_DEBUG_ID("I can help you recover");
738 sendSyncData(m_defaultUserPrefix, name, m_state);
739 return;
740 }
741 _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
742}
743
744void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800745Logic::onRecoveryData(const Interest& interest, const Data& data)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800746{
747 _LOG_DEBUG_ID(">> Logic::onRecoveryData");
748 onSyncDataValidated(data.shared_from_this());
749 _LOG_DEBUG_ID("<< Logic::onRecoveryData");
750}
751
752void
753Logic::onRecoveryTimeout(const Interest& interest)
754{
755 _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
756 _LOG_DEBUG_ID("Interest: " << interest.getName());
757 _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
758}
759
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800760void
761Logic::sendExcludeInterest(const Interest& interest, const Data& data)
762{
763 _LOG_DEBUG_ID(">> Logic::sendExcludeInterest");
764
765 Name interestName = interest.getName();
766 Interest excludeInterest(interestName);
767
768 Exclude exclude = interest.getExclude();
769 exclude.excludeOne(data.getFullName().get(-1));
770 excludeInterest.setExclude(exclude);
771
772 excludeInterest.setInterestLifetime(m_syncInterestLifetime);
773
Ashlesh Gawanded31d6b12017-03-31 11:43:22 -0500774 if (excludeInterest.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
775 return;
776 }
777
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800778 m_face.expressInterest(excludeInterest,
779 bind(&Logic::onSyncData, this, _1, _2),
780 bind(&Logic::onSyncTimeout, this, _1), // Nack
781 bind(&Logic::onSyncTimeout, this, _1));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800782
783 _LOG_DEBUG_ID("Send interest: " << excludeInterest.getName());
784 _LOG_DEBUG_ID("<< Logic::sendExcludeInterest");
785}
786
787void
788Logic::formAndSendExcludeInterest(const Name& nodePrefix, const State& commit, ndn::ConstBufferPtr previousRoot)
789{
790 _LOG_DEBUG_ID(">> Logic::formAndSendExcludeInterest");
791 Name interestName;
792 interestName.append(m_syncPrefix)
793 .append(ndn::name::Component(*previousRoot));
794 Interest interest(interestName);
795
796 shared_ptr<Data> data = make_shared<Data>(interestName);
797 data->setContent(commit.wireEncode());
798 data->setFreshnessPeriod(m_syncReplyFreshness);
799 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
800 return;
801 if (m_nodeList[nodePrefix].signingId.empty())
802 m_keyChain.sign(*data);
803 else
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500804 m_keyChain.sign(*data, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800805
806 sendExcludeInterest(interest, *data);
807
808 _LOG_DEBUG_ID("<< Logic::formAndSendExcludeInterest");
809}
810
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800811} // namespace chronosync