blob: 366ffbc51afaf0180973de18c2ce3c374bf79087 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Sonu Mishra0dadc572016-12-12 23:59:41 -08003 * Copyright (c) 2012-2017 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
28
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080029INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070030
31#ifdef _DEBUG
32#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
33#else
34#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
35#endif
36
37namespace chronosync {
38
39using ndn::ConstBufferPtr;
40using ndn::EventId;
41
42const uint8_t EMPTY_DIGEST_VALUE[] = {
43 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
44 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
45 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
46 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
47};
48
49#ifdef _DEBUG
50int Logic::m_instanceCounter = 0;
51#endif
52
Yingdi Yucd339022014-11-05 17:51:19 -080053const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080054const ndn::Name Logic::EMPTY_NAME;
Yingdi Yucd339022014-11-05 17:51:19 -080055const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070056const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
57const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
58const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
59const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
60const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
61
62const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
63const ndn::name::Component Logic::RESET_COMPONENT("reset");
64
65Logic::Logic(ndn::Face& face,
66 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080067 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070068 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080069 const Name& defaultSigningId,
Yingdi Yucd339022014-11-05 17:51:19 -080070 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070071 const time::steady_clock::Duration& resetTimer,
72 const time::steady_clock::Duration& cancelResetTimer,
73 const time::milliseconds& resetInterestLifetime,
74 const time::milliseconds& syncInterestLifetime,
75 const time::milliseconds& syncReplyFreshness)
76 : m_face(face)
77 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080078 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070079 , m_interestTable(m_face.getIoService())
80 , m_outstandingInterestId(0)
81 , m_isInReset(false)
82 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
83 , m_onUpdate(onUpdate)
84 , m_scheduler(m_face.getIoService())
85 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
86 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
87 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
88 , m_resetTimer(resetTimer)
89 , m_cancelResetTimer(cancelResetTimer)
90 , m_resetInterestLifetime(resetInterestLifetime)
91 , m_syncInterestLifetime(syncInterestLifetime)
92 , m_syncReplyFreshness(syncReplyFreshness)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080093 , m_defaultSigningId(defaultSigningId)
Yingdi Yucd339022014-11-05 17:51:19 -080094 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070095{
96#ifdef _DEBUG
97 m_instanceId = m_instanceCounter++;
98#endif
99
100 _LOG_DEBUG_ID(">> Logic::Logic");
101
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800102 addUserNode(m_defaultUserPrefix, m_defaultSigningId);
103
104
Yingdi Yuf7ede412014-08-30 20:37:52 -0700105 m_syncReset = m_syncPrefix;
106 m_syncReset.append("reset");
107
108 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
109 m_syncRegisteredPrefixId =
110 m_face.setInterestFilter(m_syncPrefix,
111 bind(&Logic::onSyncInterest, this, _1, _2),
112 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
113
Qiuhan Dinge246b622014-12-03 21:57:48 -0800114 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700115 _LOG_DEBUG_ID("<< Logic::Logic");
116}
117
118Logic::~Logic()
119{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700120 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800121 m_interestTable.clear();
122 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700123}
124
125void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800126Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700127{
128 m_isInReset = true;
129
130 m_state.reset();
131 m_log.clear();
132
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800133 if (!isOnInterest)
134 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700135
136 // reset outstanding interest name, so that data for previous interest will be dropped.
137 if (m_outstandingInterestId != 0) {
138 m_face.removePendingInterest(m_outstandingInterestId);
139 m_outstandingInterestId = 0;
140 }
141
142 sendSyncInterest();
143
144 if (static_cast<bool>(m_delayedInterestProcessingId))
145 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
146
147 m_delayedInterestProcessingId =
148 m_scheduler.scheduleEvent(m_cancelResetTimer,
149 bind(&Logic::cancelReset, this));
150}
151
152void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800153Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700154{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800155 if (defaultUserPrefix != EMPTY_NAME) {
156 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
157 m_defaultUserPrefix = defaultUserPrefix;
158 m_defaultSigningId = m_nodeList[defaultUserPrefix].signingId;
159 }
160 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700161}
162
163void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800164Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700165{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800166 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700167 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800168 if (m_defaultUserPrefix == EMPTY_NAME) {
169 m_defaultUserPrefix = userPrefix;
170 m_defaultSigningId = signingId;
171 }
172 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
173 m_nodeList[userPrefix].userPrefix = userPrefix;
174 m_nodeList[userPrefix].signingId = signingId;
175 Name sessionName = userPrefix;
176 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
177 m_nodeList[userPrefix].sessionName = sessionName;
178 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800179 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800180 }
181}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700182
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800183void
184Logic::removeUserNode(const Name& userPrefix)
185{
186 auto userNode = m_nodeList.find(userPrefix);
187 if (userNode != m_nodeList.end()) {
188 m_nodeList.erase(userNode);
189 if (m_defaultUserPrefix == userPrefix) {
190 if (!m_nodeList.empty()) {
191 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
192 m_defaultSigningId = m_nodeList.begin()->second.signingId;
193 }
194 else {
195 m_defaultUserPrefix = EMPTY_NAME;
196 m_defaultSigningId = DEFAULT_NAME;
197 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700198 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800199 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800200 }
201}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700202
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800203const Name&
204Logic::getSessionName(Name prefix)
205{
206 if (prefix == EMPTY_NAME)
207 prefix = m_defaultUserPrefix;
208 auto node = m_nodeList.find(prefix);
209 if (node != m_nodeList.end())
210 return node->second.sessionName;
211 else
212 throw Error("Refer to non-existent node:" + prefix.toUri());
213}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700214
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800215const SeqNo&
216Logic::getSeqNo(Name prefix)
217{
218 if (prefix == EMPTY_NAME)
219 prefix = m_defaultUserPrefix;
220 auto node = m_nodeList.find(prefix);
221 if (node != m_nodeList.end())
222 return node->second.seqNo;
223 else
224 throw Logic::Error("Refer to non-existent node:" + prefix.toUri());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700225
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800226}
227
228void
229Logic::updateSeqNo(const SeqNo& seqNo, const Name &updatePrefix)
230{
231 Name prefix;
232 if (updatePrefix == EMPTY_NAME) {
233 if (m_defaultUserPrefix == EMPTY_NAME)
234 return;
235 prefix = m_defaultUserPrefix;
236 }
237 else
238 prefix = updatePrefix;
239
240 auto it = m_nodeList.find(prefix);
241 if (it != m_nodeList.end()) {
242 NodeInfo& node = it->second;
243 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
244 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
245 if (seqNo < node.seqNo || seqNo == 0)
246 return;
247
248 node.seqNo = seqNo;
249 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
250
251 if (!m_isInReset) {
252 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
253 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
254 {
255 using namespace CryptoPP;
256
257 std::string hash;
258 StringSource(previousRoot->buf(), previousRoot->size(), true,
259 new HexEncoder(new StringSink(hash), false));
260 _LOG_DEBUG_ID("Hash: " << hash);
261 }
262
263 bool isInserted = false;
264 bool isUpdated = false;
265 SeqNo oldSeq;
266 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName,
267 node.seqNo);
268
269 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
270 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
271 if (isInserted || isUpdated) {
272 DiffStatePtr commit = make_shared<DiffState>();
273 commit->update(node.sessionName, node.seqNo);
274 commit->setRootDigest(m_state.getRootDigest());
275 insertToDiffLog(commit, previousRoot);
276
277 satisfyPendingSyncInterests(prefix, commit);
278 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700279 }
280 }
281}
282
283ConstBufferPtr
284Logic::getRootDigest() const
285{
286 return m_state.getRootDigest();
287}
288
289void
290Logic::printState(std::ostream& os) const
291{
292 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
293 {
294 os << *leaf << "\n";
295 }
296}
297
298std::set<Name>
299Logic::getSessionNames() const
300{
301 std::set<Name> sessionNames;
302
303 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
304 {
305 sessionNames.insert(leaf->getSessionName());
306 }
307
308 return sessionNames;
309}
310
311void
312Logic::onSyncInterest(const Name& prefix, const Interest& interest)
313{
314 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
315 Name name = interest.getName();
316
317 _LOG_DEBUG_ID("InterestName: " << name);
318
319 if (RESET_COMPONENT != name.get(-1)) {
320 // normal sync interest
321 processSyncInterest(interest.shared_from_this());
322 }
323 else
324 // reset interest
325 processResetInterest(interest);
326
327 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
328}
329
330void
331Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
332{
333 //Sync prefix registration failed
334 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
335}
336
337void
338Logic::onSyncData(const Interest& interest, Data& data)
339{
340 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800341 if (static_cast<bool>(m_validator))
342 m_validator->validate(data,
343 bind(&Logic::onSyncDataValidated, this, _1),
344 bind(&Logic::onSyncDataValidationFailed, this, _1));
345 else
346 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700347 _LOG_DEBUG_ID("<< Logic::onSyncData");
348}
349
350void
351Logic::onResetData(const Interest& interest, Data& data)
352{
353 // This should not happened, drop the received data.
354}
355
356void
357Logic::onSyncTimeout(const Interest& interest)
358{
359 // It is OK. Others will handle the time out situation.
360 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
361 _LOG_DEBUG_ID("Interest: " << interest.getName());
362 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
363}
364
365void
366Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
367{
368 // SyncReply cannot be validated.
369}
370
371void
372Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
373{
374 Name name = data->getName();
375 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
376
377 processSyncData(name, digest, data->getContent().blockFromValue());
378}
379
380void
381Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
382 bool isTimedProcessing/*=false*/)
383{
384 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
385
386 const Name& name = interest->getName();
387 ConstBufferPtr digest =
388 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
389
390 ConstBufferPtr rootDigest = m_state.getRootDigest();
391
392 // If the digest of the incoming interest is the same as root digest
393 // Put the interest into InterestTable
394 if (*rootDigest == *digest) {
395 _LOG_DEBUG_ID("Oh, we are in the same state");
396 m_interestTable.insert(interest, digest, false);
397
398 if (!m_isInReset)
399 return;
400
401 if (!isTimedProcessing) {
402 _LOG_DEBUG_ID("Non timed processing in reset");
403 // Still in reset, our own seq has not been put into state yet
404 // Do not hurry, some others may be also resetting and may send their reply
405 if (static_cast<bool>(m_delayedInterestProcessingId))
406 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
407
408 time::milliseconds after(m_rangeUniformRandom());
409 _LOG_DEBUG_ID("After: " << after);
410 m_delayedInterestProcessingId =
411 m_scheduler.scheduleEvent(after,
412 bind(&Logic::processSyncInterest, this, interest, true));
413 }
414 else {
415 _LOG_DEBUG_ID("Timed processing in reset");
416 // Now we can get out of reset state by putting our own stuff into m_state.
417 cancelReset();
418 }
419
420 return;
421 }
422
423 // If the digest of incoming interest is an "empty" digest
424 if (digest == EMPTY_DIGEST) {
425 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800426 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700427 return;
428 }
429
430 DiffStateContainer::iterator stateIter = m_log.find(digest);
431 // If the digest of incoming interest can be found from the log
432 if (stateIter != m_log.end()) {
433 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800434 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700435 return;
436 }
437
438 if (!isTimedProcessing) {
439 _LOG_DEBUG_ID("Let's wait, just wait for a while");
440 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800441 bool doesExist = m_interestTable.has(digest);
442 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700443 if (doesExist)
444 // Original comment (not sure): somebody else replied, so restart random-game timer
445 // YY: Get the same SyncInterest again, refresh the timer
446 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
447
448 m_delayedInterestProcessingId =
449 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
450 bind(&Logic::processSyncInterest, this, interest, true));
451 }
452 else {
453 // OK, nobody is helping us, just tell the truth.
454 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
455 m_interestTable.erase(digest);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800456 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700457 }
458
459 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
460}
461
462void
463Logic::processResetInterest(const Interest& interest)
464{
465 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800466 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700467}
468
469void
470Logic::processSyncData(const Name& name,
471 ndn::ConstBufferPtr digest,
472 const Block& syncReplyBlock)
473{
474 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700475 DiffStatePtr commit = make_shared<DiffState>();
476 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
477
478 try {
479 m_interestTable.erase(digest); // Remove satisfied interest from PIT
480
481 State reply;
482 reply.wireDecode(syncReplyBlock);
483
484 std::vector<MissingDataInfo> v;
485 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
486 {
487 BOOST_ASSERT(leaf != 0);
488
489 const Name& info = leaf->getSessionName();
490 SeqNo seq = leaf->getSeq();
491
492 bool isInserted = false;
493 bool isUpdated = false;
494 SeqNo oldSeq;
495 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700496 if (isInserted || isUpdated) {
497 commit->update(info, seq);
498
499 oldSeq++;
500 MissingDataInfo mdi = {info, oldSeq, seq};
501 v.push_back(mdi);
502 }
503 }
504
505 if (!v.empty()) {
506 m_onUpdate(v);
507
508 commit->setRootDigest(m_state.getRootDigest());
509 insertToDiffLog(commit, previousRoot);
510 }
511 else {
512 _LOG_DEBUG_ID("What? nothing new");
513 }
514 }
515 catch (State::Error&) {
516 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
517 // Something really fishy happened during state decoding;
518 commit.reset();
519 return;
520 }
521
522 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
523 // state changed and it is safe to express a new interest
524 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
525 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
526 EventId eventId = m_scheduler.scheduleEvent(after,
527 bind(&Logic::sendSyncInterest, this));
528
529 m_scheduler.cancelEvent(m_reexpressingInterestId);
530 m_reexpressingInterestId = eventId;
531 }
532}
533
534void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800535Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700536{
537 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
538 try {
539 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
540 for (InterestTable::const_iterator it = m_interestTable.begin();
541 it != m_interestTable.end(); it++) {
542 ConstUnsatisfiedInterestPtr request = *it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700543 if (request->isUnknown)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800544 sendSyncData(updatedPrefix, request->interest->getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700545 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800546 sendSyncData(updatedPrefix, request->interest->getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700547 }
548 m_interestTable.clear();
549 }
550 catch (InterestTable::Error&) {
551 // ok. not really an error
552 }
553 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
554}
555
556void
557Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
558{
559 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
560 // Connect to the history
561 if (!m_log.empty())
562 (*m_log.find(previousRoot))->setNext(commit);
563
564 // Insert the commit
565 m_log.erase(commit->getRootDigest());
566 m_log.insert(commit);
567 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
568}
569
570void
571Logic::sendResetInterest()
572{
573 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
574
575 if (m_needPeriodReset) {
576 _LOG_DEBUG_ID("Need Period Reset");
577 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
578
579 EventId eventId =
580 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
581 bind(&Logic::sendResetInterest, this));
582 m_scheduler.cancelEvent(m_resetInterestId);
583 m_resetInterestId = eventId;
584 }
585
586 Interest interest(m_syncReset);
587 interest.setMustBeFresh(true);
588 interest.setInterestLifetime(m_resetInterestLifetime);
589 m_face.expressInterest(interest,
590 bind(&Logic::onResetData, this, _1, _2),
591 bind(&Logic::onSyncTimeout, this, _1));
592
593 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
594}
595
596void
597Logic::sendSyncInterest()
598{
599 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
600
601 Name interestName;
602 interestName.append(m_syncPrefix)
603 .append(ndn::name::Component(*m_state.getRootDigest()));
604
605 m_outstandingInterestName = interestName;
606
607#ifdef _DEBUG
608 printDigest(m_state.getRootDigest());
609#endif
610
611 EventId eventId =
Sonu Mishra0dadc572016-12-12 23:59:41 -0800612 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
Yingdi Yuf7ede412014-08-30 20:37:52 -0700613 ndn::time::milliseconds(m_reexpressionJitter()),
614 bind(&Logic::sendSyncInterest, this));
615 m_scheduler.cancelEvent(m_reexpressingInterestId);
616 m_reexpressingInterestId = eventId;
617
618 Interest interest(interestName);
619 interest.setMustBeFresh(true);
620 interest.setInterestLifetime(m_syncInterestLifetime);
621
622 m_outstandingInterestId = m_face.expressInterest(interest,
623 bind(&Logic::onSyncData, this, _1, _2),
624 bind(&Logic::onSyncTimeout, this, _1));
625
626 _LOG_DEBUG_ID("Send interest: " << interest.getName());
627 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
628}
629
630void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800631Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700632{
633 _LOG_DEBUG_ID(">> Logic::sendSyncData");
634 shared_ptr<Data> syncReply = make_shared<Data>(name);
635 syncReply->setContent(state.wireEncode());
636 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800637 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
638 return;
639 if (m_nodeList[nodePrefix].signingId.empty())
Yingdi Yucd339022014-11-05 17:51:19 -0800640 m_keyChain.sign(*syncReply);
641 else
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800642 m_keyChain.signByIdentity(*syncReply, m_nodeList[nodePrefix].signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700643
644 m_face.put(*syncReply);
645
646 // checking if our own interest got satisfied
647 if (m_outstandingInterestName == name) {
648 // remove outstanding interest
649 if (m_outstandingInterestId != 0) {
650 m_face.removePendingInterest(m_outstandingInterestId);
651 m_outstandingInterestId = 0;
652 }
653
654 // re-schedule sending Sync interest
655 time::milliseconds after(m_reexpressionJitter());
656 _LOG_DEBUG_ID("Satisfy our own interest");
657 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
658 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
659 m_scheduler.cancelEvent(m_reexpressingInterestId);
660 m_reexpressingInterestId = eventId;
661 }
662 _LOG_DEBUG_ID("<< Logic::sendSyncData");
663}
664
665void
666Logic::cancelReset()
667{
668 _LOG_DEBUG_ID(">> Logic::cancelReset");
669 if (!m_isInReset)
670 return;
671
672 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800673 for (const auto& node : m_nodeList) {
674 updateSeqNo(node.second.seqNo, node.first);
675 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700676 _LOG_DEBUG_ID("<< Logic::cancelReset");
677}
678
679void
680Logic::printDigest(ndn::ConstBufferPtr digest)
681{
682 using namespace CryptoPP;
683
684 std::string hash;
685 StringSource(digest->buf(), digest->size(), true,
686 new HexEncoder(new StringSink(hash), false));
687 _LOG_DEBUG_ID("Hash: " << hash);
688}
689
690} // namespace chronosync