blob: 0c5b87a89d49dee2721fee70901d63a9fb7aaae4 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
Yingdi Yucd339022014-11-05 17:51:19 -080052const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080053const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080054const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070055const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
56const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
57const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
58const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
60
61const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
62const ndn::name::Component Logic::RESET_COMPONENT("reset");
63
64Logic::Logic(ndn::Face& face,
65 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080066 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070067 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080068 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080069 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070070 const time::steady_clock::Duration& resetTimer,
71 const time::steady_clock::Duration& cancelResetTimer,
72 const time::milliseconds& resetInterestLifetime,
73 const time::milliseconds& syncInterestLifetime,
74 const time::milliseconds& syncReplyFreshness)
75 : m_face(face)
76 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080077 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070078 , m_interestTable(m_face.getIoService())
79 , m_outstandingInterestId(0)
80 , m_isInReset(false)
81 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
82 , m_onUpdate(onUpdate)
83 , m_scheduler(m_face.getIoService())
84 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
85 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
86 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
87 , m_resetTimer(resetTimer)
88 , m_cancelResetTimer(cancelResetTimer)
89 , m_resetInterestLifetime(resetInterestLifetime)
90 , m_syncInterestLifetime(syncInterestLifetime)
91 , m_syncReplyFreshness(syncReplyFreshness)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080092 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080093 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070094{
95#ifdef _DEBUG
96 m_instanceId = m_instanceCounter++;
97#endif
98
99 _LOG_DEBUG_ID(">> Logic::Logic");
100
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800101 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
102
103
Yingdi Yuf7ede412014-08-30 20:37:52 -0700104 m_syncReset = m_syncPrefix;
105 m_syncReset.append("reset");
106
107 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
108 m_syncRegisteredPrefixId =
109 m_face.setInterestFilter(m_syncPrefix,
110 bind(&Logic::onSyncInterest, this, _1, _2),
111 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
112
Qiuhan Dinge246b622014-12-03 21:57:48 -0800113 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700114 _LOG_DEBUG_ID("<< Logic::Logic");
115}
116
117Logic::~Logic()
118{
119 m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
120 m_scheduler.cancelAllEvents();
121}
122
123void
124Logic::reset()
125{
126 m_isInReset = true;
127
128 m_state.reset();
129 m_log.clear();
130
131 sendResetInterest();
132
133 // reset outstanding interest name, so that data for previous interest will be dropped.
134 if (m_outstandingInterestId != 0) {
135 m_face.removePendingInterest(m_outstandingInterestId);
136 m_outstandingInterestId = 0;
137 }
138
139 sendSyncInterest();
140
141 if (static_cast<bool>(m_delayedInterestProcessingId))
142 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
143
144 m_delayedInterestProcessingId =
145 m_scheduler.scheduleEvent(m_cancelResetTimer,
146 bind(&Logic::cancelReset, this));
147}
148
149void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800150Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700151{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800152 if (defaultUserPrefix != EMPTY_NAME) {
153 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
154 m_defaultUserPrefix = defaultUserPrefix;
155 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
156 }
157 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700158}
159
160void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800161Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700162{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800163 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700164 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800165 if (m_defaultUserPrefix == EMPTY_NAME) {
166 m_defaultUserPrefix = userPrefix;
167 m_defaultSigningId = signingId;
168 }
169 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
170 m_nodeList[userPrefix].userPrefix = userPrefix;
171 m_nodeList[userPrefix].signingId = signingId;
172 Name sessionName = userPrefix;
173 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
174 m_nodeList[userPrefix].sessionName = sessionName;
175 m_nodeList[userPrefix].seqNo = 0;
176 reset();
177 }
178}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700179
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800180void
181Logic::removeUserNode(const Name& userPrefix)
182{
183 auto userNode = m_nodeList.find(userPrefix);
184 if (userNode != m_nodeList.end()) {
185 m_nodeList.erase(userNode);
186 if (m_defaultUserPrefix == userPrefix) {
187 if (!m_nodeList.empty()) {
188 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
189 m_defaultSigningId = m_nodeList.begin()->second.signingId;
190 }
191 else {
192 m_defaultUserPrefix = EMPTY_NAME;
193 m_defaultSigningId = DEFAULT_NAME;
194 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700195 }
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800196 reset();
197 }
198}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700199
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800200const Name&
201Logic::getSessionName(Name prefix)
202{
203 if (prefix == EMPTY_NAME)
204 prefix = m_defaultUserPrefix;
205 auto node = m_nodeList.find(prefix);
206 if (node != m_nodeList.end())
207 return node->second.sessionName;
208 else
209 throw Error("Refer to non-existent node:" + prefix.toUri());
210}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700211
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800212const SeqNo&
213Logic::getSeqNo(Name prefix)
214{
215 if (prefix == EMPTY_NAME)
216 prefix = m_defaultUserPrefix;
217 auto node = m_nodeList.find(prefix);
218 if (node != m_nodeList.end())
219 return node->second.seqNo;
220 else
221 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700222
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800223}
224
225void
226Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
227{
228 Name prefix;
229 if (updatePrefix == EMPTY_NAME) {
230 if (m_defaultUserPrefix == EMPTY_NAME)
231 return;
232 prefix = m_defaultUserPrefix;
233 }
234 else
235 prefix = updatePrefix;
236
237 auto it = m_nodeList.find(prefix);
238 if (it != m_nodeList.end()) {
239 NodeInfo& node = it->second;
240 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
241 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
242 if (seqNo < node.seqNo || seqNo == 0)
243 return;
244
245 node.seqNo = seqNo;
246 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
247
248 if (!m_isInReset) {
249 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
250 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
251 {
252 using namespace CryptoPP;
253
254 std::string hash;
255 StringSource(previousRoot->buf(), previousRoot->size(), true,
256 new HexEncoder(new StringSink(hash), false));
257 _LOG_DEBUG_ID("Hash: " << hash);
258 }
259
260 bool isInserted = false;
261 bool isUpdated = false;
262 SeqNo oldSeq;
263 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
264 node.seqNo);
265
266 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
267 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
268 if (isInserted || isUpdated) {
269 DiffStatePtr commit = make_shared<DiffState>();
270 commit->update(node.sessionName, node.seqNo);
271 commit->setRootDigest(m_state.getRootDigest());
272 insertToDiffLog(commit, previousRoot);
273
274 satisfyPendingSyncInterests(prefix, commit);
275 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700276 }
277 }
278}
279
280ConstBufferPtr
281Logic::getRootDigest() const
282{
283 return m_state.getRootDigest();
284}
285
286void
287Logic::printState(std::ostream& os) const
288{
289 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
290 {
291 os << *leaf << "\n";
292 }
293}
294
295std::set<Name>
296Logic::getSessionNames() const
297{
298 std::set<Name> sessionNames;
299
300 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
301 {
302 sessionNames.insert(leaf->getSessionName());
303 }
304
305 return sessionNames;
306}
307
308void
309Logic::onSyncInterest(const Name& prefix, const Interest& interest)
310{
311 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
312 Name name = interest.getName();
313
314 _LOG_DEBUG_ID("InterestName: " << name);
315
316 if (RESET_COMPONENT != name.get(-1)) {
317 // normal sync interest
318 processSyncInterest(interest.shared_from_this());
319 }
320 else
321 // reset interest
322 processResetInterest(interest);
323
324 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
325}
326
327void
328Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
329{
330 //Sync prefix registration failed
331 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
332}
333
334void
335Logic::onSyncData(const Interest& interest, Data& data)
336{
337 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800338 if (static_cast<bool>(m_validator))
339 m_validator->validate(data,
340 bind(&Logic::onSyncDataValidated, this, _1),
341 bind(&Logic::onSyncDataValidationFailed, this, _1));
342 else
343 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700344 _LOG_DEBUG_ID("<< Logic::onSyncData");
345}
346
347void
348Logic::onResetData(const Interest& interest, Data& data)
349{
350 // This should not happened, drop the received data.
351}
352
353void
354Logic::onSyncTimeout(const Interest& interest)
355{
356 // It is OK. Others will handle the time out situation.
357 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
358 _LOG_DEBUG_ID("Interest: " << interest.getName());
359 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
360}
361
362void
363Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
364{
365 // SyncReply cannot be validated.
366}
367
368void
369Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
370{
371 Name name = data->getName();
372 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
373
374 processSyncData(name, digest, data->getContent().blockFromValue());
375}
376
377void
378Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
379 bool isTimedProcessing/*=false*/)
380{
381 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
382
383 const Name& name = interest->getName();
384 ConstBufferPtr digest =
385 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
386
387 ConstBufferPtr rootDigest = m_state.getRootDigest();
388
389 // If the digest of the incoming interest is the same as root digest
390 // Put the interest into InterestTable
391 if (*rootDigest == *digest) {
392 _LOG_DEBUG_ID("Oh, we are in the same state");
393 m_interestTable.insert(interest, digest, false);
394
395 if (!m_isInReset)
396 return;
397
398 if (!isTimedProcessing) {
399 _LOG_DEBUG_ID("Non timed processing in reset");
400 // Still in reset, our own seq has not been put into state yet
401 // Do not hurry, some others may be also resetting and may send their reply
402 if (static_cast<bool>(m_delayedInterestProcessingId))
403 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
404
405 time::milliseconds after(m_rangeUniformRandom());
406 _LOG_DEBUG_ID("After: " << after);
407 m_delayedInterestProcessingId =
408 m_scheduler.scheduleEvent(after,
409 bind(&Logic::processSyncInterest, this, interest, true));
410 }
411 else {
412 _LOG_DEBUG_ID("Timed processing in reset");
413 // Now we can get out of reset state by putting our own stuff into m_state.
414 cancelReset();
415 }
416
417 return;
418 }
419
420 // If the digest of incoming interest is an "empty" digest
421 if (digest == EMPTY_DIGEST) {
422 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800423 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700424 return;
425 }
426
427 DiffStateContainer::iterator stateIter = m_log.find(digest);
428 // If the digest of incoming interest can be found from the log
429 if (stateIter != m_log.end()) {
430 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800431 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700432 return;
433 }
434
435 if (!isTimedProcessing) {
436 _LOG_DEBUG_ID("Let's wait, just wait for a while");
437 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
438 bool doesExist = m_interestTable.insert(interest, digest, true);
439 if (doesExist)
440 // Original comment (not sure): somebody else replied, so restart random-game timer
441 // YY: Get the same SyncInterest again, refresh the timer
442 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
443
444 m_delayedInterestProcessingId =
445 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
446 bind(&Logic::processSyncInterest, this, interest, true));
447 }
448 else {
449 // OK, nobody is helping us, just tell the truth.
450 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
451 m_interestTable.erase(digest);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800452 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700453 }
454
455 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
456}
457
458void
459Logic::processResetInterest(const Interest& interest)
460{
461 _LOG_DEBUG_ID(">> Logic::processResetInterest");
462 reset();
463}
464
465void
466Logic::processSyncData(const Name& name,
467 ndn::ConstBufferPtr digest,
468 const Block& syncReplyBlock)
469{
470 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700471 DiffStatePtr commit = make_shared<DiffState>();
472 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
473
474 try {
475 m_interestTable.erase(digest); // Remove satisfied interest from PIT
476
477 State reply;
478 reply.wireDecode(syncReplyBlock);
479
480 std::vector<MissingDataInfo> v;
481 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
482 {
483 BOOST_ASSERT(leaf != 0);
484
485 const Name& info = leaf->getSessionName();
486 SeqNo seq = leaf->getSeq();
487
488 bool isInserted = false;
489 bool isUpdated = false;
490 SeqNo oldSeq;
491 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700492 if (isInserted || isUpdated) {
493 commit->update(info, seq);
494
495 oldSeq++;
496 MissingDataInfo mdi = {info, oldSeq, seq};
497 v.push_back(mdi);
498 }
499 }
500
501 if (!v.empty()) {
502 m_onUpdate(v);
503
504 commit->setRootDigest(m_state.getRootDigest());
505 insertToDiffLog(commit, previousRoot);
506 }
507 else {
508 _LOG_DEBUG_ID("What? nothing new");
509 }
510 }
511 catch (State::Error&) {
512 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
513 // Something really fishy happened during state decoding;
514 commit.reset();
515 return;
516 }
517
518 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
519 // state changed and it is safe to express a new interest
520 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
521 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
522 EventId eventId = m_scheduler.scheduleEvent(after,
523 bind(&Logic::sendSyncInterest, this));
524
525 m_scheduler.cancelEvent(m_reexpressingInterestId);
526 m_reexpressingInterestId = eventId;
527 }
528}
529
530void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800531Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700532{
533 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
534 try {
535 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
536 for (InterestTable::const_iterator it = m_interestTable.begin();
537 it != m_interestTable.end(); it++) {
538 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700539 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800540 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700541 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800542 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700543 }
544 m_interestTable.clear();
545 }
546 catch (InterestTable::Error&) {
547 // ok. not really an error
548 }
549 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
550}
551
552void
553Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
554{
555 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
556 // Connect to the history
557 if (!m_log.empty())
558 (*m_log.find(previousRoot))->setNext(commit);
559
560 // Insert the commit
561 m_log.erase(commit->getRootDigest());
562 m_log.insert(commit);
563 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
564}
565
566void
567Logic::sendResetInterest()
568{
569 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
570
571 if (m_needPeriodReset) {
572 _LOG_DEBUG_ID("Need Period Reset");
573 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
574
575 EventId eventId =
576 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
577 bind(&Logic::sendResetInterest, this));
578 m_scheduler.cancelEvent(m_resetInterestId);
579 m_resetInterestId = eventId;
580 }
581
582 Interest interest(m_syncReset);
583 interest.setMustBeFresh(true);
584 interest.setInterestLifetime(m_resetInterestLifetime);
585 m_face.expressInterest(interest,
586 bind(&Logic::onResetData, this, _1, _2),
587 bind(&Logic::onSyncTimeout, this, _1));
588
589 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
590}
591
592void
593Logic::sendSyncInterest()
594{
595 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
596
597 Name interestName;
598 interestName.append(m_syncPrefix)
599 .append(ndn::name::Component(*m_state.getRootDigest()));
600
601 m_outstandingInterestName = interestName;
602
603#ifdef _DEBUG
604 printDigest(m_state.getRootDigest());
605#endif
606
607 EventId eventId =
608 m_scheduler.scheduleEvent(m_syncInterestLifetime +
609 ndn::time::milliseconds(m_reexpressionJitter()),
610 bind(&Logic::sendSyncInterest, this));
611 m_scheduler.cancelEvent(m_reexpressingInterestId);
612 m_reexpressingInterestId = eventId;
613
614 Interest interest(interestName);
615 interest.setMustBeFresh(true);
616 interest.setInterestLifetime(m_syncInterestLifetime);
617
618 m_outstandingInterestId = m_face.expressInterest(interest,
619 bind(&Logic::onSyncData, this, _1, _2),
620 bind(&Logic::onSyncTimeout, this, _1));
621
622 _LOG_DEBUG_ID("Send interest: " << interest.getName());
623 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
624}
625
626void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800627Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700628{
629 _LOG_DEBUG_ID(">> Logic::sendSyncData");
630 shared_ptr<Data> syncReply = make_shared<Data>(name);
631 syncReply->setContent(state.wireEncode());
632 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800633 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
634 return;
635 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800636 m_keyChain.sign(*syncReply);
637 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800638 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700639
640 m_face.put(*syncReply);
641
642 // checking if our own interest got satisfied
643 if (m_outstandingInterestName == name) {
644 // remove outstanding interest
645 if (m_outstandingInterestId != 0) {
646 m_face.removePendingInterest(m_outstandingInterestId);
647 m_outstandingInterestId = 0;
648 }
649
650 // re-schedule sending Sync interest
651 time::milliseconds after(m_reexpressionJitter());
652 _LOG_DEBUG_ID("Satisfy our own interest");
653 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
654 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
655 m_scheduler.cancelEvent(m_reexpressingInterestId);
656 m_reexpressingInterestId = eventId;
657 }
658 _LOG_DEBUG_ID("<< Logic::sendSyncData");
659}
660
661void
662Logic::cancelReset()
663{
664 _LOG_DEBUG_ID(">> Logic::cancelReset");
665 if (!m_isInReset)
666 return;
667
668 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800669 for (const auto& node : m_nodeList) {
670 updateSeqNo(node.second.seqNo, node.first);
671 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700672 _LOG_DEBUG_ID("<< Logic::cancelReset");
673}
674
675void
676Logic::printDigest(ndn::ConstBufferPtr digest)
677{
678 using namespace CryptoPP;
679
680 std::string hash;
681 StringSource(digest->buf(), digest->size(), true,
682 new HexEncoder(new StringSink(hash), false));
683 _LOG_DEBUG_ID("Hash: " << hash);
684}
685
686} // namespace chronosync