blob: 25f2abbd385535235850517cbd51aafe9df31802 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
Yingdi Yucd339022014-11-05 17:51:19 -080052const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080053const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080054const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070055const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
56const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
57const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
58const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
60
61const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
62const ndn::name::Component Logic::RESET_COMPONENT("reset");
63
64Logic::Logic(ndn::Face& face,
65 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080066 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070067 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080068 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080069 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070070 const time::steady_clock::Duration& resetTimer,
71 const time::steady_clock::Duration& cancelResetTimer,
72 const time::milliseconds& resetInterestLifetime,
73 const time::milliseconds& syncInterestLifetime,
74 const time::milliseconds& syncReplyFreshness)
75 : m_face(face)
76 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080077 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070078 , m_interestTable(m_face.getIoService())
79 , m_outstandingInterestId(0)
80 , m_isInReset(false)
81 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
82 , m_onUpdate(onUpdate)
83 , m_scheduler(m_face.getIoService())
84 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
85 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
86 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
87 , m_resetTimer(resetTimer)
88 , m_cancelResetTimer(cancelResetTimer)
89 , m_resetInterestLifetime(resetInterestLifetime)
90 , m_syncInterestLifetime(syncInterestLifetime)
91 , m_syncReplyFreshness(syncReplyFreshness)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080092 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080093 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070094{
95#ifdef _DEBUG
96 m_instanceId = m_instanceCounter++;
97#endif
98
99 _LOG_DEBUG_ID(">> Logic::Logic");
100
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800101 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
102
103
Yingdi Yuf7ede412014-08-30 20:37:52 -0700104 m_syncReset = m_syncPrefix;
105 m_syncReset.append("reset");
106
107 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
108 m_syncRegisteredPrefixId =
109 m_face.setInterestFilter(m_syncPrefix,
110 bind(&Logic::onSyncInterest, this, _1, _2),
111 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
112
Qiuhan Dinge246b622014-12-03 21:57:48 -0800113 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700114 _LOG_DEBUG_ID("<< Logic::Logic");
115}
116
117Logic::~Logic()
118{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700119 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800120 m_interestTable.clear();
121 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700122}
123
124void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800125Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700126{
127 m_isInReset = true;
128
129 m_state.reset();
130 m_log.clear();
131
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800132 if (!isOnInterest)
133 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700134
135 // reset outstanding interest name, so that data for previous interest will be dropped.
136 if (m_outstandingInterestId != 0) {
137 m_face.removePendingInterest(m_outstandingInterestId);
138 m_outstandingInterestId = 0;
139 }
140
141 sendSyncInterest();
142
143 if (static_cast<bool>(m_delayedInterestProcessingId))
144 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
145
146 m_delayedInterestProcessingId =
147 m_scheduler.scheduleEvent(m_cancelResetTimer,
148 bind(&Logic::cancelReset, this));
149}
150
151void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800152Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700153{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800154 if (defaultUserPrefix != EMPTY_NAME) {
155 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
156 m_defaultUserPrefix = defaultUserPrefix;
157 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
158 }
159 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700160}
161
162void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800163Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700164{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800165 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700166 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800167 if (m_defaultUserPrefix == EMPTY_NAME) {
168 m_defaultUserPrefix = userPrefix;
169 m_defaultSigningId = signingId;
170 }
171 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
172 m_nodeList[userPrefix].userPrefix = userPrefix;
173 m_nodeList[userPrefix].signingId = signingId;
174 Name sessionName = userPrefix;
175 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
176 m_nodeList[userPrefix].sessionName = sessionName;
177 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800178 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800179 }
180}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700181
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800182void
183Logic::removeUserNode(const Name& userPrefix)
184{
185 auto userNode = m_nodeList.find(userPrefix);
186 if (userNode != m_nodeList.end()) {
187 m_nodeList.erase(userNode);
188 if (m_defaultUserPrefix == userPrefix) {
189 if (!m_nodeList.empty()) {
190 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
191 m_defaultSigningId = m_nodeList.begin()->second.signingId;
192 }
193 else {
194 m_defaultUserPrefix = EMPTY_NAME;
195 m_defaultSigningId = DEFAULT_NAME;
196 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700197 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800198 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800199 }
200}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700201
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800202const Name&
203Logic::getSessionName(Name prefix)
204{
205 if (prefix == EMPTY_NAME)
206 prefix = m_defaultUserPrefix;
207 auto node = m_nodeList.find(prefix);
208 if (node != m_nodeList.end())
209 return node->second.sessionName;
210 else
211 throw Error("Refer to non-existent node:" + prefix.toUri());
212}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700213
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800214const SeqNo&
215Logic::getSeqNo(Name prefix)
216{
217 if (prefix == EMPTY_NAME)
218 prefix = m_defaultUserPrefix;
219 auto node = m_nodeList.find(prefix);
220 if (node != m_nodeList.end())
221 return node->second.seqNo;
222 else
223 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700224
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800225}
226
227void
228Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
229{
230 Name prefix;
231 if (updatePrefix == EMPTY_NAME) {
232 if (m_defaultUserPrefix == EMPTY_NAME)
233 return;
234 prefix = m_defaultUserPrefix;
235 }
236 else
237 prefix = updatePrefix;
238
239 auto it = m_nodeList.find(prefix);
240 if (it != m_nodeList.end()) {
241 NodeInfo& node = it->second;
242 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
243 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
244 if (seqNo < node.seqNo || seqNo == 0)
245 return;
246
247 node.seqNo = seqNo;
248 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
249
250 if (!m_isInReset) {
251 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
252 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
253 {
254 using namespace CryptoPP;
255
256 std::string hash;
257 StringSource(previousRoot->buf(), previousRoot->size(), true,
258 new HexEncoder(new StringSink(hash), false));
259 _LOG_DEBUG_ID("Hash: " << hash);
260 }
261
262 bool isInserted = false;
263 bool isUpdated = false;
264 SeqNo oldSeq;
265 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
266 node.seqNo);
267
268 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
269 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
270 if (isInserted || isUpdated) {
271 DiffStatePtr commit = make_shared<DiffState>();
272 commit->update(node.sessionName, node.seqNo);
273 commit->setRootDigest(m_state.getRootDigest());
274 insertToDiffLog(commit, previousRoot);
275
276 satisfyPendingSyncInterests(prefix, commit);
277 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700278 }
279 }
280}
281
282ConstBufferPtr
283Logic::getRootDigest() const
284{
285 return m_state.getRootDigest();
286}
287
288void
289Logic::printState(std::ostream& os) const
290{
291 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
292 {
293 os << *leaf << "\n";
294 }
295}
296
297std::set<Name>
298Logic::getSessionNames() const
299{
300 std::set<Name> sessionNames;
301
302 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
303 {
304 sessionNames.insert(leaf->getSessionName());
305 }
306
307 return sessionNames;
308}
309
310void
311Logic::onSyncInterest(const Name& prefix, const Interest& interest)
312{
313 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
314 Name name = interest.getName();
315
316 _LOG_DEBUG_ID("InterestName: " << name);
317
318 if (RESET_COMPONENT != name.get(-1)) {
319 // normal sync interest
320 processSyncInterest(interest.shared_from_this());
321 }
322 else
323 // reset interest
324 processResetInterest(interest);
325
326 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
327}
328
329void
330Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
331{
332 //Sync prefix registration failed
333 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
334}
335
336void
337Logic::onSyncData(const Interest& interest, Data& data)
338{
339 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800340 if (static_cast<bool>(m_validator))
341 m_validator->validate(data,
342 bind(&Logic::onSyncDataValidated, this, _1),
343 bind(&Logic::onSyncDataValidationFailed, this, _1));
344 else
345 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700346 _LOG_DEBUG_ID("<< Logic::onSyncData");
347}
348
349void
350Logic::onResetData(const Interest& interest, Data& data)
351{
352 // This should not happened, drop the received data.
353}
354
355void
356Logic::onSyncTimeout(const Interest& interest)
357{
358 // It is OK. Others will handle the time out situation.
359 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
360 _LOG_DEBUG_ID("Interest: " << interest.getName());
361 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
362}
363
364void
365Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
366{
367 // SyncReply cannot be validated.
368}
369
370void
371Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
372{
373 Name name = data->getName();
374 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
375
376 processSyncData(name, digest, data->getContent().blockFromValue());
377}
378
379void
380Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
381 bool isTimedProcessing/*=false*/)
382{
383 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
384
385 const Name& name = interest->getName();
386 ConstBufferPtr digest =
387 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
388
389 ConstBufferPtr rootDigest = m_state.getRootDigest();
390
391 // If the digest of the incoming interest is the same as root digest
392 // Put the interest into InterestTable
393 if (*rootDigest == *digest) {
394 _LOG_DEBUG_ID("Oh, we are in the same state");
395 m_interestTable.insert(interest, digest, false);
396
397 if (!m_isInReset)
398 return;
399
400 if (!isTimedProcessing) {
401 _LOG_DEBUG_ID("Non timed processing in reset");
402 // Still in reset, our own seq has not been put into state yet
403 // Do not hurry, some others may be also resetting and may send their reply
404 if (static_cast<bool>(m_delayedInterestProcessingId))
405 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
406
407 time::milliseconds after(m_rangeUniformRandom());
408 _LOG_DEBUG_ID("After: " << after);
409 m_delayedInterestProcessingId =
410 m_scheduler.scheduleEvent(after,
411 bind(&Logic::processSyncInterest, this, interest, true));
412 }
413 else {
414 _LOG_DEBUG_ID("Timed processing in reset");
415 // Now we can get out of reset state by putting our own stuff into m_state.
416 cancelReset();
417 }
418
419 return;
420 }
421
422 // If the digest of incoming interest is an "empty" digest
423 if (digest == EMPTY_DIGEST) {
424 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800425 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700426 return;
427 }
428
429 DiffStateContainer::iterator stateIter = m_log.find(digest);
430 // If the digest of incoming interest can be found from the log
431 if (stateIter != m_log.end()) {
432 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800433 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700434 return;
435 }
436
437 if (!isTimedProcessing) {
438 _LOG_DEBUG_ID("Let's wait, just wait for a while");
439 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800440 bool doesExist = m_interestTable.has(digest);
441 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700442 if (doesExist)
443 // Original comment (not sure): somebody else replied, so restart random-game timer
444 // YY: Get the same SyncInterest again, refresh the timer
445 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
446
447 m_delayedInterestProcessingId =
448 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
449 bind(&Logic::processSyncInterest, this, interest, true));
450 }
451 else {
452 // OK, nobody is helping us, just tell the truth.
453 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
454 m_interestTable.erase(digest);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800455 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700456 }
457
458 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
459}
460
461void
462Logic::processResetInterest(const Interest& interest)
463{
464 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800465 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700466}
467
468void
469Logic::processSyncData(const Name& name,
470 ndn::ConstBufferPtr digest,
471 const Block& syncReplyBlock)
472{
473 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700474 DiffStatePtr commit = make_shared<DiffState>();
475 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
476
477 try {
478 m_interestTable.erase(digest); // Remove satisfied interest from PIT
479
480 State reply;
481 reply.wireDecode(syncReplyBlock);
482
483 std::vector<MissingDataInfo> v;
484 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
485 {
486 BOOST_ASSERT(leaf != 0);
487
488 const Name& info = leaf->getSessionName();
489 SeqNo seq = leaf->getSeq();
490
491 bool isInserted = false;
492 bool isUpdated = false;
493 SeqNo oldSeq;
494 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700495 if (isInserted || isUpdated) {
496 commit->update(info, seq);
497
498 oldSeq++;
499 MissingDataInfo mdi = {info, oldSeq, seq};
500 v.push_back(mdi);
501 }
502 }
503
504 if (!v.empty()) {
505 m_onUpdate(v);
506
507 commit->setRootDigest(m_state.getRootDigest());
508 insertToDiffLog(commit, previousRoot);
509 }
510 else {
511 _LOG_DEBUG_ID("What? nothing new");
512 }
513 }
514 catch (State::Error&) {
515 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
516 // Something really fishy happened during state decoding;
517 commit.reset();
518 return;
519 }
520
521 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
522 // state changed and it is safe to express a new interest
523 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
524 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
525 EventId eventId = m_scheduler.scheduleEvent(after,
526 bind(&Logic::sendSyncInterest, this));
527
528 m_scheduler.cancelEvent(m_reexpressingInterestId);
529 m_reexpressingInterestId = eventId;
530 }
531}
532
533void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800534Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700535{
536 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
537 try {
538 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
539 for (InterestTable::const_iterator it = m_interestTable.begin();
540 it != m_interestTable.end(); it++) {
541 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700542 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800543 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700544 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800545 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700546 }
547 m_interestTable.clear();
548 }
549 catch (InterestTable::Error&) {
550 // ok. not really an error
551 }
552 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
553}
554
555void
556Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
557{
558 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
559 // Connect to the history
560 if (!m_log.empty())
561 (*m_log.find(previousRoot))->setNext(commit);
562
563 // Insert the commit
564 m_log.erase(commit->getRootDigest());
565 m_log.insert(commit);
566 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
567}
568
569void
570Logic::sendResetInterest()
571{
572 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
573
574 if (m_needPeriodReset) {
575 _LOG_DEBUG_ID("Need Period Reset");
576 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
577
578 EventId eventId =
579 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
580 bind(&Logic::sendResetInterest, this));
581 m_scheduler.cancelEvent(m_resetInterestId);
582 m_resetInterestId = eventId;
583 }
584
585 Interest interest(m_syncReset);
586 interest.setMustBeFresh(true);
587 interest.setInterestLifetime(m_resetInterestLifetime);
588 m_face.expressInterest(interest,
589 bind(&Logic::onResetData, this, _1, _2),
590 bind(&Logic::onSyncTimeout, this, _1));
591
592 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
593}
594
595void
596Logic::sendSyncInterest()
597{
598 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
599
600 Name interestName;
601 interestName.append(m_syncPrefix)
602 .append(ndn::name::Component(*m_state.getRootDigest()));
603
604 m_outstandingInterestName = interestName;
605
606#ifdef _DEBUG
607 printDigest(m_state.getRootDigest());
608#endif
609
610 EventId eventId =
611 m_scheduler.scheduleEvent(m_syncInterestLifetime +
612 ndn::time::milliseconds(m_reexpressionJitter()),
613 bind(&Logic::sendSyncInterest, this));
614 m_scheduler.cancelEvent(m_reexpressingInterestId);
615 m_reexpressingInterestId = eventId;
616
617 Interest interest(interestName);
618 interest.setMustBeFresh(true);
619 interest.setInterestLifetime(m_syncInterestLifetime);
620
621 m_outstandingInterestId = m_face.expressInterest(interest,
622 bind(&Logic::onSyncData, this, _1, _2),
623 bind(&Logic::onSyncTimeout, this, _1));
624
625 _LOG_DEBUG_ID("Send interest: " << interest.getName());
626 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
627}
628
629void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800630Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700631{
632 _LOG_DEBUG_ID(">> Logic::sendSyncData");
633 shared_ptr<Data> syncReply = make_shared<Data>(name);
634 syncReply->setContent(state.wireEncode());
635 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800636 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
637 return;
638 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800639 m_keyChain.sign(*syncReply);
640 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800641 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700642
643 m_face.put(*syncReply);
644
645 // checking if our own interest got satisfied
646 if (m_outstandingInterestName == name) {
647 // remove outstanding interest
648 if (m_outstandingInterestId != 0) {
649 m_face.removePendingInterest(m_outstandingInterestId);
650 m_outstandingInterestId = 0;
651 }
652
653 // re-schedule sending Sync interest
654 time::milliseconds after(m_reexpressionJitter());
655 _LOG_DEBUG_ID("Satisfy our own interest");
656 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
657 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
658 m_scheduler.cancelEvent(m_reexpressingInterestId);
659 m_reexpressingInterestId = eventId;
660 }
661 _LOG_DEBUG_ID("<< Logic::sendSyncData");
662}
663
664void
665Logic::cancelReset()
666{
667 _LOG_DEBUG_ID(">> Logic::cancelReset");
668 if (!m_isInReset)
669 return;
670
671 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800672 for (const auto& node : m_nodeList) {
673 updateSeqNo(node.second.seqNo, node.first);
674 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700675 _LOG_DEBUG_ID("<< Logic::cancelReset");
676}
677
678void
679Logic::printDigest(ndn::ConstBufferPtr digest)
680{
681 using namespace CryptoPP;
682
683 std::string hash;
684 StringSource(digest->buf(), digest->size(), true,
685 new HexEncoder(new StringSink(hash), false));
686 _LOG_DEBUG_ID("Hash: " << hash);
687}
688
689} // namespace chronosync