blob: e0c05011e52c453ffca50c8700ffeb908a24fc9a [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
Yingdi Yucd339022014-11-05 17:51:19 -080052const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080053const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080054const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070055const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
56const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
57const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
58const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
60
61const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
62const ndn::name::Component Logic::RESET_COMPONENT("reset");
63
64Logic::Logic(ndn::Face& face,
65 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080066 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070067 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080068 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080069 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070070 const time::steady_clock::Duration& resetTimer,
71 const time::steady_clock::Duration& cancelResetTimer,
72 const time::milliseconds& resetInterestLifetime,
73 const time::milliseconds& syncInterestLifetime,
74 const time::milliseconds& syncReplyFreshness)
75 : m_face(face)
76 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080077 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070078 , m_interestTable(m_face.getIoService())
79 , m_outstandingInterestId(0)
80 , m_isInReset(false)
81 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
82 , m_onUpdate(onUpdate)
83 , m_scheduler(m_face.getIoService())
84 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
85 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
86 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
87 , m_resetTimer(resetTimer)
88 , m_cancelResetTimer(cancelResetTimer)
89 , m_resetInterestLifetime(resetInterestLifetime)
90 , m_syncInterestLifetime(syncInterestLifetime)
91 , m_syncReplyFreshness(syncReplyFreshness)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080092 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080093 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070094{
95#ifdef _DEBUG
96 m_instanceId = m_instanceCounter++;
97#endif
98
99 _LOG_DEBUG_ID(">> Logic::Logic");
100
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800101 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
102
103
Yingdi Yuf7ede412014-08-30 20:37:52 -0700104 m_syncReset = m_syncPrefix;
105 m_syncReset.append("reset");
106
107 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
108 m_syncRegisteredPrefixId =
109 m_face.setInterestFilter(m_syncPrefix,
110 bind(&Logic::onSyncInterest, this, _1, _2),
111 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
112
Qiuhan Dinge246b622014-12-03 21:57:48 -0800113 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700114 _LOG_DEBUG_ID("<< Logic::Logic");
115}
116
117Logic::~Logic()
118{
119 m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
120 m_scheduler.cancelAllEvents();
121}
122
123void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800124Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700125{
126 m_isInReset = true;
127
128 m_state.reset();
129 m_log.clear();
130
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800131 if (!isOnInterest)
132 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700133
134 // reset outstanding interest name, so that data for previous interest will be dropped.
135 if (m_outstandingInterestId != 0) {
136 m_face.removePendingInterest(m_outstandingInterestId);
137 m_outstandingInterestId = 0;
138 }
139
140 sendSyncInterest();
141
142 if (static_cast<bool>(m_delayedInterestProcessingId))
143 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
144
145 m_delayedInterestProcessingId =
146 m_scheduler.scheduleEvent(m_cancelResetTimer,
147 bind(&Logic::cancelReset, this));
148}
149
150void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800151Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700152{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800153 if (defaultUserPrefix != EMPTY_NAME) {
154 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
155 m_defaultUserPrefix = defaultUserPrefix;
156 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
157 }
158 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700159}
160
161void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800162Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700163{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800164 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700165 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800166 if (m_defaultUserPrefix == EMPTY_NAME) {
167 m_defaultUserPrefix = userPrefix;
168 m_defaultSigningId = signingId;
169 }
170 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
171 m_nodeList[userPrefix].userPrefix = userPrefix;
172 m_nodeList[userPrefix].signingId = signingId;
173 Name sessionName = userPrefix;
174 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
175 m_nodeList[userPrefix].sessionName = sessionName;
176 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800177 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800178 }
179}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700180
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800181void
182Logic::removeUserNode(const Name& userPrefix)
183{
184 auto userNode = m_nodeList.find(userPrefix);
185 if (userNode != m_nodeList.end()) {
186 m_nodeList.erase(userNode);
187 if (m_defaultUserPrefix == userPrefix) {
188 if (!m_nodeList.empty()) {
189 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
190 m_defaultSigningId = m_nodeList.begin()->second.signingId;
191 }
192 else {
193 m_defaultUserPrefix = EMPTY_NAME;
194 m_defaultSigningId = DEFAULT_NAME;
195 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700196 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800197 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800198 }
199}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700200
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800201const Name&
202Logic::getSessionName(Name prefix)
203{
204 if (prefix == EMPTY_NAME)
205 prefix = m_defaultUserPrefix;
206 auto node = m_nodeList.find(prefix);
207 if (node != m_nodeList.end())
208 return node->second.sessionName;
209 else
210 throw Error("Refer to non-existent node:" + prefix.toUri());
211}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700212
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800213const SeqNo&
214Logic::getSeqNo(Name prefix)
215{
216 if (prefix == EMPTY_NAME)
217 prefix = m_defaultUserPrefix;
218 auto node = m_nodeList.find(prefix);
219 if (node != m_nodeList.end())
220 return node->second.seqNo;
221 else
222 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700223
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800224}
225
226void
227Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
228{
229 Name prefix;
230 if (updatePrefix == EMPTY_NAME) {
231 if (m_defaultUserPrefix == EMPTY_NAME)
232 return;
233 prefix = m_defaultUserPrefix;
234 }
235 else
236 prefix = updatePrefix;
237
238 auto it = m_nodeList.find(prefix);
239 if (it != m_nodeList.end()) {
240 NodeInfo& node = it->second;
241 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
242 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
243 if (seqNo < node.seqNo || seqNo == 0)
244 return;
245
246 node.seqNo = seqNo;
247 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
248
249 if (!m_isInReset) {
250 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
251 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
252 {
253 using namespace CryptoPP;
254
255 std::string hash;
256 StringSource(previousRoot->buf(), previousRoot->size(), true,
257 new HexEncoder(new StringSink(hash), false));
258 _LOG_DEBUG_ID("Hash: " << hash);
259 }
260
261 bool isInserted = false;
262 bool isUpdated = false;
263 SeqNo oldSeq;
264 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
265 node.seqNo);
266
267 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
268 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
269 if (isInserted || isUpdated) {
270 DiffStatePtr commit = make_shared<DiffState>();
271 commit->update(node.sessionName, node.seqNo);
272 commit->setRootDigest(m_state.getRootDigest());
273 insertToDiffLog(commit, previousRoot);
274
275 satisfyPendingSyncInterests(prefix, commit);
276 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700277 }
278 }
279}
280
281ConstBufferPtr
282Logic::getRootDigest() const
283{
284 return m_state.getRootDigest();
285}
286
287void
288Logic::printState(std::ostream& os) const
289{
290 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
291 {
292 os << *leaf << "\n";
293 }
294}
295
296std::set<Name>
297Logic::getSessionNames() const
298{
299 std::set<Name> sessionNames;
300
301 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
302 {
303 sessionNames.insert(leaf->getSessionName());
304 }
305
306 return sessionNames;
307}
308
309void
310Logic::onSyncInterest(const Name& prefix, const Interest& interest)
311{
312 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
313 Name name = interest.getName();
314
315 _LOG_DEBUG_ID("InterestName: " << name);
316
317 if (RESET_COMPONENT != name.get(-1)) {
318 // normal sync interest
319 processSyncInterest(interest.shared_from_this());
320 }
321 else
322 // reset interest
323 processResetInterest(interest);
324
325 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
326}
327
328void
329Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
330{
331 //Sync prefix registration failed
332 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
333}
334
335void
336Logic::onSyncData(const Interest& interest, Data& data)
337{
338 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800339 if (static_cast<bool>(m_validator))
340 m_validator->validate(data,
341 bind(&Logic::onSyncDataValidated, this, _1),
342 bind(&Logic::onSyncDataValidationFailed, this, _1));
343 else
344 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700345 _LOG_DEBUG_ID("<< Logic::onSyncData");
346}
347
348void
349Logic::onResetData(const Interest& interest, Data& data)
350{
351 // This should not happened, drop the received data.
352}
353
354void
355Logic::onSyncTimeout(const Interest& interest)
356{
357 // It is OK. Others will handle the time out situation.
358 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
359 _LOG_DEBUG_ID("Interest: " << interest.getName());
360 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
361}
362
363void
364Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
365{
366 // SyncReply cannot be validated.
367}
368
369void
370Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
371{
372 Name name = data->getName();
373 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
374
375 processSyncData(name, digest, data->getContent().blockFromValue());
376}
377
378void
379Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
380 bool isTimedProcessing/*=false*/)
381{
382 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
383
384 const Name& name = interest->getName();
385 ConstBufferPtr digest =
386 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
387
388 ConstBufferPtr rootDigest = m_state.getRootDigest();
389
390 // If the digest of the incoming interest is the same as root digest
391 // Put the interest into InterestTable
392 if (*rootDigest == *digest) {
393 _LOG_DEBUG_ID("Oh, we are in the same state");
394 m_interestTable.insert(interest, digest, false);
395
396 if (!m_isInReset)
397 return;
398
399 if (!isTimedProcessing) {
400 _LOG_DEBUG_ID("Non timed processing in reset");
401 // Still in reset, our own seq has not been put into state yet
402 // Do not hurry, some others may be also resetting and may send their reply
403 if (static_cast<bool>(m_delayedInterestProcessingId))
404 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
405
406 time::milliseconds after(m_rangeUniformRandom());
407 _LOG_DEBUG_ID("After: " << after);
408 m_delayedInterestProcessingId =
409 m_scheduler.scheduleEvent(after,
410 bind(&Logic::processSyncInterest, this, interest, true));
411 }
412 else {
413 _LOG_DEBUG_ID("Timed processing in reset");
414 // Now we can get out of reset state by putting our own stuff into m_state.
415 cancelReset();
416 }
417
418 return;
419 }
420
421 // If the digest of incoming interest is an "empty" digest
422 if (digest == EMPTY_DIGEST) {
423 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800424 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700425 return;
426 }
427
428 DiffStateContainer::iterator stateIter = m_log.find(digest);
429 // If the digest of incoming interest can be found from the log
430 if (stateIter != m_log.end()) {
431 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800432 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700433 return;
434 }
435
436 if (!isTimedProcessing) {
437 _LOG_DEBUG_ID("Let's wait, just wait for a while");
438 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
439 bool doesExist = m_interestTable.insert(interest, digest, true);
440 if (doesExist)
441 // Original comment (not sure): somebody else replied, so restart random-game timer
442 // YY: Get the same SyncInterest again, refresh the timer
443 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
444
445 m_delayedInterestProcessingId =
446 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
447 bind(&Logic::processSyncInterest, this, interest, true));
448 }
449 else {
450 // OK, nobody is helping us, just tell the truth.
451 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
452 m_interestTable.erase(digest);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800453 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700454 }
455
456 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
457}
458
459void
460Logic::processResetInterest(const Interest& interest)
461{
462 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800463 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700464}
465
466void
467Logic::processSyncData(const Name& name,
468 ndn::ConstBufferPtr digest,
469 const Block& syncReplyBlock)
470{
471 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700472 DiffStatePtr commit = make_shared<DiffState>();
473 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
474
475 try {
476 m_interestTable.erase(digest); // Remove satisfied interest from PIT
477
478 State reply;
479 reply.wireDecode(syncReplyBlock);
480
481 std::vector<MissingDataInfo> v;
482 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
483 {
484 BOOST_ASSERT(leaf != 0);
485
486 const Name& info = leaf->getSessionName();
487 SeqNo seq = leaf->getSeq();
488
489 bool isInserted = false;
490 bool isUpdated = false;
491 SeqNo oldSeq;
492 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700493 if (isInserted || isUpdated) {
494 commit->update(info, seq);
495
496 oldSeq++;
497 MissingDataInfo mdi = {info, oldSeq, seq};
498 v.push_back(mdi);
499 }
500 }
501
502 if (!v.empty()) {
503 m_onUpdate(v);
504
505 commit->setRootDigest(m_state.getRootDigest());
506 insertToDiffLog(commit, previousRoot);
507 }
508 else {
509 _LOG_DEBUG_ID("What? nothing new");
510 }
511 }
512 catch (State::Error&) {
513 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
514 // Something really fishy happened during state decoding;
515 commit.reset();
516 return;
517 }
518
519 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
520 // state changed and it is safe to express a new interest
521 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
522 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
523 EventId eventId = m_scheduler.scheduleEvent(after,
524 bind(&Logic::sendSyncInterest, this));
525
526 m_scheduler.cancelEvent(m_reexpressingInterestId);
527 m_reexpressingInterestId = eventId;
528 }
529}
530
531void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800532Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700533{
534 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
535 try {
536 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
537 for (InterestTable::const_iterator it = m_interestTable.begin();
538 it != m_interestTable.end(); it++) {
539 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700540 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800541 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700542 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800543 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700544 }
545 m_interestTable.clear();
546 }
547 catch (InterestTable::Error&) {
548 // ok. not really an error
549 }
550 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
551}
552
553void
554Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
555{
556 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
557 // Connect to the history
558 if (!m_log.empty())
559 (*m_log.find(previousRoot))->setNext(commit);
560
561 // Insert the commit
562 m_log.erase(commit->getRootDigest());
563 m_log.insert(commit);
564 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
565}
566
567void
568Logic::sendResetInterest()
569{
570 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
571
572 if (m_needPeriodReset) {
573 _LOG_DEBUG_ID("Need Period Reset");
574 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
575
576 EventId eventId =
577 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
578 bind(&Logic::sendResetInterest, this));
579 m_scheduler.cancelEvent(m_resetInterestId);
580 m_resetInterestId = eventId;
581 }
582
583 Interest interest(m_syncReset);
584 interest.setMustBeFresh(true);
585 interest.setInterestLifetime(m_resetInterestLifetime);
586 m_face.expressInterest(interest,
587 bind(&Logic::onResetData, this, _1, _2),
588 bind(&Logic::onSyncTimeout, this, _1));
589
590 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
591}
592
593void
594Logic::sendSyncInterest()
595{
596 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
597
598 Name interestName;
599 interestName.append(m_syncPrefix)
600 .append(ndn::name::Component(*m_state.getRootDigest()));
601
602 m_outstandingInterestName = interestName;
603
604#ifdef _DEBUG
605 printDigest(m_state.getRootDigest());
606#endif
607
608 EventId eventId =
609 m_scheduler.scheduleEvent(m_syncInterestLifetime +
610 ndn::time::milliseconds(m_reexpressionJitter()),
611 bind(&Logic::sendSyncInterest, this));
612 m_scheduler.cancelEvent(m_reexpressingInterestId);
613 m_reexpressingInterestId = eventId;
614
615 Interest interest(interestName);
616 interest.setMustBeFresh(true);
617 interest.setInterestLifetime(m_syncInterestLifetime);
618
619 m_outstandingInterestId = m_face.expressInterest(interest,
620 bind(&Logic::onSyncData, this, _1, _2),
621 bind(&Logic::onSyncTimeout, this, _1));
622
623 _LOG_DEBUG_ID("Send interest: " << interest.getName());
624 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
625}
626
627void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800628Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700629{
630 _LOG_DEBUG_ID(">> Logic::sendSyncData");
631 shared_ptr<Data> syncReply = make_shared<Data>(name);
632 syncReply->setContent(state.wireEncode());
633 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800634 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
635 return;
636 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800637 m_keyChain.sign(*syncReply);
638 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800639 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700640
641 m_face.put(*syncReply);
642
643 // checking if our own interest got satisfied
644 if (m_outstandingInterestName == name) {
645 // remove outstanding interest
646 if (m_outstandingInterestId != 0) {
647 m_face.removePendingInterest(m_outstandingInterestId);
648 m_outstandingInterestId = 0;
649 }
650
651 // re-schedule sending Sync interest
652 time::milliseconds after(m_reexpressionJitter());
653 _LOG_DEBUG_ID("Satisfy our own interest");
654 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
655 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
656 m_scheduler.cancelEvent(m_reexpressingInterestId);
657 m_reexpressingInterestId = eventId;
658 }
659 _LOG_DEBUG_ID("<< Logic::sendSyncData");
660}
661
662void
663Logic::cancelReset()
664{
665 _LOG_DEBUG_ID(">> Logic::cancelReset");
666 if (!m_isInReset)
667 return;
668
669 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800670 for (const auto& node : m_nodeList) {
671 updateSeqNo(node.second.seqNo, node.first);
672 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700673 _LOG_DEBUG_ID("<< Logic::cancelReset");
674}
675
676void
677Logic::printDigest(ndn::ConstBufferPtr digest)
678{
679 using namespace CryptoPP;
680
681 std::string hash;
682 StringSource(digest->buf(), digest->size(), true,
683 new HexEncoder(new StringSink(hash), false));
684 _LOG_DEBUG_ID("Hash: " << hash);
685}
686
687} // namespace chronosync