blob: 48d6cfb97837b021fcc6e3cb53bcee966d806289 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
Yingdi Yucd339022014-11-05 17:51:19 -080052const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080053const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080054const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070055const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
56const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
57const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
58const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
60
61const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
62const ndn::name::Component Logic::RESET_COMPONENT("reset");
63
64Logic::Logic(ndn::Face& face,
65 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080066 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070067 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080068 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080069 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070070 const time::steady_clock::Duration& resetTimer,
71 const time::steady_clock::Duration& cancelResetTimer,
72 const time::milliseconds& resetInterestLifetime,
73 const time::milliseconds& syncInterestLifetime,
74 const time::milliseconds& syncReplyFreshness)
75 : m_face(face)
76 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080077 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070078 , m_interestTable(m_face.getIoService())
79 , m_outstandingInterestId(0)
80 , m_isInReset(false)
81 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
82 , m_onUpdate(onUpdate)
83 , m_scheduler(m_face.getIoService())
84 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
85 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
86 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
87 , m_resetTimer(resetTimer)
88 , m_cancelResetTimer(cancelResetTimer)
89 , m_resetInterestLifetime(resetInterestLifetime)
90 , m_syncInterestLifetime(syncInterestLifetime)
91 , m_syncReplyFreshness(syncReplyFreshness)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080092 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080093 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070094{
95#ifdef _DEBUG
96 m_instanceId = m_instanceCounter++;
97#endif
98
99 _LOG_DEBUG_ID(">> Logic::Logic");
100
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800101 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
102
103
Yingdi Yuf7ede412014-08-30 20:37:52 -0700104 m_syncReset = m_syncPrefix;
105 m_syncReset.append("reset");
106
107 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
108 m_syncRegisteredPrefixId =
109 m_face.setInterestFilter(m_syncPrefix,
110 bind(&Logic::onSyncInterest, this, _1, _2),
111 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
112
Qiuhan Dinge246b622014-12-03 21:57:48 -0800113 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700114 _LOG_DEBUG_ID("<< Logic::Logic");
115}
116
117Logic::~Logic()
118{
119 m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
120 m_scheduler.cancelAllEvents();
121}
122
123void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800124Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700125{
126 m_isInReset = true;
127
128 m_state.reset();
129 m_log.clear();
130
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800131 if (!isOnInterest)
132 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700133
134 // reset outstanding interest name, so that data for previous interest will be dropped.
135 if (m_outstandingInterestId != 0) {
136 m_face.removePendingInterest(m_outstandingInterestId);
137 m_outstandingInterestId = 0;
138 }
139
140 sendSyncInterest();
141
142 if (static_cast<bool>(m_delayedInterestProcessingId))
143 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
144
145 m_delayedInterestProcessingId =
146 m_scheduler.scheduleEvent(m_cancelResetTimer,
147 bind(&Logic::cancelReset, this));
148}
149
150void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800151Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700152{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800153 if (defaultUserPrefix != EMPTY_NAME) {
154 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
155 m_defaultUserPrefix = defaultUserPrefix;
156 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
157 }
158 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700159}
160
161void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800162Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700163{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800164 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700165 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800166 if (m_defaultUserPrefix == EMPTY_NAME) {
167 m_defaultUserPrefix = userPrefix;
168 m_defaultSigningId = signingId;
169 }
170 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
171 m_nodeList[userPrefix].userPrefix = userPrefix;
172 m_nodeList[userPrefix].signingId = signingId;
173 Name sessionName = userPrefix;
174 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
175 m_nodeList[userPrefix].sessionName = sessionName;
176 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800177 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800178 }
179}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700180
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800181void
182Logic::removeUserNode(const Name& userPrefix)
183{
184 auto userNode = m_nodeList.find(userPrefix);
185 if (userNode != m_nodeList.end()) {
186 m_nodeList.erase(userNode);
187 if (m_defaultUserPrefix == userPrefix) {
188 if (!m_nodeList.empty()) {
189 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
190 m_defaultSigningId = m_nodeList.begin()->second.signingId;
191 }
192 else {
193 m_defaultUserPrefix = EMPTY_NAME;
194 m_defaultSigningId = DEFAULT_NAME;
195 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700196 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800197 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800198 }
199}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700200
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800201const Name&
202Logic::getSessionName(Name prefix)
203{
204 if (prefix == EMPTY_NAME)
205 prefix = m_defaultUserPrefix;
206 auto node = m_nodeList.find(prefix);
207 if (node != m_nodeList.end())
208 return node->second.sessionName;
209 else
210 throw Error("Refer to non-existent node:" + prefix.toUri());
211}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700212
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800213const SeqNo&
214Logic::getSeqNo(Name prefix)
215{
216 if (prefix == EMPTY_NAME)
217 prefix = m_defaultUserPrefix;
218 auto node = m_nodeList.find(prefix);
219 if (node != m_nodeList.end())
220 return node->second.seqNo;
221 else
222 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700223
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800224}
225
226void
227Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
228{
229 Name prefix;
230 if (updatePrefix == EMPTY_NAME) {
231 if (m_defaultUserPrefix == EMPTY_NAME)
232 return;
233 prefix = m_defaultUserPrefix;
234 }
235 else
236 prefix = updatePrefix;
237
238 auto it = m_nodeList.find(prefix);
239 if (it != m_nodeList.end()) {
240 NodeInfo& node = it->second;
241 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
242 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
243 if (seqNo < node.seqNo || seqNo == 0)
244 return;
245
246 node.seqNo = seqNo;
247 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
248
249 if (!m_isInReset) {
250 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
251 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
252 {
253 using namespace CryptoPP;
254
255 std::string hash;
256 StringSource(previousRoot->buf(), previousRoot->size(), true,
257 new HexEncoder(new StringSink(hash), false));
258 _LOG_DEBUG_ID("Hash: " << hash);
259 }
260
261 bool isInserted = false;
262 bool isUpdated = false;
263 SeqNo oldSeq;
264 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
265 node.seqNo);
266
267 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
268 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
269 if (isInserted || isUpdated) {
270 DiffStatePtr commit = make_shared<DiffState>();
271 commit->update(node.sessionName, node.seqNo);
272 commit->setRootDigest(m_state.getRootDigest());
273 insertToDiffLog(commit, previousRoot);
274
275 satisfyPendingSyncInterests(prefix, commit);
276 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700277 }
278 }
279}
280
281ConstBufferPtr
282Logic::getRootDigest() const
283{
284 return m_state.getRootDigest();
285}
286
287void
288Logic::printState(std::ostream& os) const
289{
290 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
291 {
292 os << *leaf << "\n";
293 }
294}
295
296std::set<Name>
297Logic::getSessionNames() const
298{
299 std::set<Name> sessionNames;
300
301 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
302 {
303 sessionNames.insert(leaf->getSessionName());
304 }
305
306 return sessionNames;
307}
308
309void
310Logic::onSyncInterest(const Name& prefix, const Interest& interest)
311{
312 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
313 Name name = interest.getName();
314
315 _LOG_DEBUG_ID("InterestName: " << name);
316
317 if (RESET_COMPONENT != name.get(-1)) {
318 // normal sync interest
319 processSyncInterest(interest.shared_from_this());
320 }
321 else
322 // reset interest
323 processResetInterest(interest);
324
325 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
326}
327
328void
329Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
330{
331 //Sync prefix registration failed
332 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
333}
334
335void
336Logic::onSyncData(const Interest& interest, Data& data)
337{
338 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800339 if (static_cast<bool>(m_validator))
340 m_validator->validate(data,
341 bind(&Logic::onSyncDataValidated, this, _1),
342 bind(&Logic::onSyncDataValidationFailed, this, _1));
343 else
344 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700345 _LOG_DEBUG_ID("<< Logic::onSyncData");
346}
347
348void
349Logic::onResetData(const Interest& interest, Data& data)
350{
351 // This should not happened, drop the received data.
352}
353
354void
355Logic::onSyncTimeout(const Interest& interest)
356{
357 // It is OK. Others will handle the time out situation.
358 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
359 _LOG_DEBUG_ID("Interest: " << interest.getName());
360 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
361}
362
363void
364Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
365{
366 // SyncReply cannot be validated.
367}
368
369void
370Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
371{
372 Name name = data->getName();
373 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
374
375 processSyncData(name, digest, data->getContent().blockFromValue());
376}
377
378void
379Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
380 bool isTimedProcessing/*=false*/)
381{
382 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
383
384 const Name& name = interest->getName();
385 ConstBufferPtr digest =
386 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
387
388 ConstBufferPtr rootDigest = m_state.getRootDigest();
389
390 // If the digest of the incoming interest is the same as root digest
391 // Put the interest into InterestTable
392 if (*rootDigest == *digest) {
393 _LOG_DEBUG_ID("Oh, we are in the same state");
394 m_interestTable.insert(interest, digest, false);
395
396 if (!m_isInReset)
397 return;
398
399 if (!isTimedProcessing) {
400 _LOG_DEBUG_ID("Non timed processing in reset");
401 // Still in reset, our own seq has not been put into state yet
402 // Do not hurry, some others may be also resetting and may send their reply
403 if (static_cast<bool>(m_delayedInterestProcessingId))
404 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
405
406 time::milliseconds after(m_rangeUniformRandom());
407 _LOG_DEBUG_ID("After: " << after);
408 m_delayedInterestProcessingId =
409 m_scheduler.scheduleEvent(after,
410 bind(&Logic::processSyncInterest, this, interest, true));
411 }
412 else {
413 _LOG_DEBUG_ID("Timed processing in reset");
414 // Now we can get out of reset state by putting our own stuff into m_state.
415 cancelReset();
416 }
417
418 return;
419 }
420
421 // If the digest of incoming interest is an "empty" digest
422 if (digest == EMPTY_DIGEST) {
423 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800424 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700425 return;
426 }
427
428 DiffStateContainer::iterator stateIter = m_log.find(digest);
429 // If the digest of incoming interest can be found from the log
430 if (stateIter != m_log.end()) {
431 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800432 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700433 return;
434 }
435
436 if (!isTimedProcessing) {
437 _LOG_DEBUG_ID("Let's wait, just wait for a while");
438 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800439 bool doesExist = m_interestTable.has(digest);
440 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700441 if (doesExist)
442 // Original comment (not sure): somebody else replied, so restart random-game timer
443 // YY: Get the same SyncInterest again, refresh the timer
444 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
445
446 m_delayedInterestProcessingId =
447 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
448 bind(&Logic::processSyncInterest, this, interest, true));
449 }
450 else {
451 // OK, nobody is helping us, just tell the truth.
452 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
453 m_interestTable.erase(digest);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800454 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700455 }
456
457 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
458}
459
460void
461Logic::processResetInterest(const Interest& interest)
462{
463 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800464 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700465}
466
467void
468Logic::processSyncData(const Name& name,
469 ndn::ConstBufferPtr digest,
470 const Block& syncReplyBlock)
471{
472 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700473 DiffStatePtr commit = make_shared<DiffState>();
474 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
475
476 try {
477 m_interestTable.erase(digest); // Remove satisfied interest from PIT
478
479 State reply;
480 reply.wireDecode(syncReplyBlock);
481
482 std::vector<MissingDataInfo> v;
483 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
484 {
485 BOOST_ASSERT(leaf != 0);
486
487 const Name& info = leaf->getSessionName();
488 SeqNo seq = leaf->getSeq();
489
490 bool isInserted = false;
491 bool isUpdated = false;
492 SeqNo oldSeq;
493 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700494 if (isInserted || isUpdated) {
495 commit->update(info, seq);
496
497 oldSeq++;
498 MissingDataInfo mdi = {info, oldSeq, seq};
499 v.push_back(mdi);
500 }
501 }
502
503 if (!v.empty()) {
504 m_onUpdate(v);
505
506 commit->setRootDigest(m_state.getRootDigest());
507 insertToDiffLog(commit, previousRoot);
508 }
509 else {
510 _LOG_DEBUG_ID("What? nothing new");
511 }
512 }
513 catch (State::Error&) {
514 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
515 // Something really fishy happened during state decoding;
516 commit.reset();
517 return;
518 }
519
520 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
521 // state changed and it is safe to express a new interest
522 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
523 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
524 EventId eventId = m_scheduler.scheduleEvent(after,
525 bind(&Logic::sendSyncInterest, this));
526
527 m_scheduler.cancelEvent(m_reexpressingInterestId);
528 m_reexpressingInterestId = eventId;
529 }
530}
531
532void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800533Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700534{
535 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
536 try {
537 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
538 for (InterestTable::const_iterator it = m_interestTable.begin();
539 it != m_interestTable.end(); it++) {
540 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700541 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800542 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700543 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800544 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700545 }
546 m_interestTable.clear();
547 }
548 catch (InterestTable::Error&) {
549 // ok. not really an error
550 }
551 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
552}
553
554void
555Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
556{
557 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
558 // Connect to the history
559 if (!m_log.empty())
560 (*m_log.find(previousRoot))->setNext(commit);
561
562 // Insert the commit
563 m_log.erase(commit->getRootDigest());
564 m_log.insert(commit);
565 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
566}
567
568void
569Logic::sendResetInterest()
570{
571 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
572
573 if (m_needPeriodReset) {
574 _LOG_DEBUG_ID("Need Period Reset");
575 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
576
577 EventId eventId =
578 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
579 bind(&Logic::sendResetInterest, this));
580 m_scheduler.cancelEvent(m_resetInterestId);
581 m_resetInterestId = eventId;
582 }
583
584 Interest interest(m_syncReset);
585 interest.setMustBeFresh(true);
586 interest.setInterestLifetime(m_resetInterestLifetime);
587 m_face.expressInterest(interest,
588 bind(&Logic::onResetData, this, _1, _2),
589 bind(&Logic::onSyncTimeout, this, _1));
590
591 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
592}
593
594void
595Logic::sendSyncInterest()
596{
597 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
598
599 Name interestName;
600 interestName.append(m_syncPrefix)
601 .append(ndn::name::Component(*m_state.getRootDigest()));
602
603 m_outstandingInterestName = interestName;
604
605#ifdef _DEBUG
606 printDigest(m_state.getRootDigest());
607#endif
608
609 EventId eventId =
610 m_scheduler.scheduleEvent(m_syncInterestLifetime +
611 ndn::time::milliseconds(m_reexpressionJitter()),
612 bind(&Logic::sendSyncInterest, this));
613 m_scheduler.cancelEvent(m_reexpressingInterestId);
614 m_reexpressingInterestId = eventId;
615
616 Interest interest(interestName);
617 interest.setMustBeFresh(true);
618 interest.setInterestLifetime(m_syncInterestLifetime);
619
620 m_outstandingInterestId = m_face.expressInterest(interest,
621 bind(&Logic::onSyncData, this, _1, _2),
622 bind(&Logic::onSyncTimeout, this, _1));
623
624 _LOG_DEBUG_ID("Send interest: " << interest.getName());
625 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
626}
627
628void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800629Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700630{
631 _LOG_DEBUG_ID(">> Logic::sendSyncData");
632 shared_ptr<Data> syncReply = make_shared<Data>(name);
633 syncReply->setContent(state.wireEncode());
634 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800635 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
636 return;
637 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800638 m_keyChain.sign(*syncReply);
639 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800640 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700641
642 m_face.put(*syncReply);
643
644 // checking if our own interest got satisfied
645 if (m_outstandingInterestName == name) {
646 // remove outstanding interest
647 if (m_outstandingInterestId != 0) {
648 m_face.removePendingInterest(m_outstandingInterestId);
649 m_outstandingInterestId = 0;
650 }
651
652 // re-schedule sending Sync interest
653 time::milliseconds after(m_reexpressionJitter());
654 _LOG_DEBUG_ID("Satisfy our own interest");
655 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
656 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
657 m_scheduler.cancelEvent(m_reexpressingInterestId);
658 m_reexpressingInterestId = eventId;
659 }
660 _LOG_DEBUG_ID("<< Logic::sendSyncData");
661}
662
663void
664Logic::cancelReset()
665{
666 _LOG_DEBUG_ID(">> Logic::cancelReset");
667 if (!m_isInReset)
668 return;
669
670 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800671 for (const auto& node : m_nodeList) {
672 updateSeqNo(node.second.seqNo, node.first);
673 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700674 _LOG_DEBUG_ID("<< Logic::cancelReset");
675}
676
677void
678Logic::printDigest(ndn::ConstBufferPtr digest)
679{
680 using namespace CryptoPP;
681
682 std::string hash;
683 StringSource(digest->buf(), digest->size(), true,
684 new HexEncoder(new StringSink(hash), false));
685 _LOG_DEBUG_ID("Hash: " << hash);
686}
687
688} // namespace chronosync