blob: 4c3fc5579921ef2a840ac88c2ece3b1b8067a783 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
Yingdi Yucd339022014-11-05 17:51:19 -080052const ndn::Name Logic::DEFAULT_NAME;
53const ndn::shared_ptr<ndn::Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070054const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
55const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
56const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
57const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
58const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
59
60const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
61const ndn::name::Component Logic::RESET_COMPONENT("reset");
62
63Logic::Logic(ndn::Face& face,
64 const Name& syncPrefix,
65 const Name& userPrefix,
66 const UpdateCallback& onUpdate,
Yingdi Yucd339022014-11-05 17:51:19 -080067 const Name& signingId,
68 ndn::shared_ptr<ndn::Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070069 const time::steady_clock::Duration& resetTimer,
70 const time::steady_clock::Duration& cancelResetTimer,
71 const time::milliseconds& resetInterestLifetime,
72 const time::milliseconds& syncInterestLifetime,
73 const time::milliseconds& syncReplyFreshness)
74 : m_face(face)
75 , m_syncPrefix(syncPrefix)
76 , m_userPrefix(userPrefix)
77 , m_interestTable(m_face.getIoService())
78 , m_outstandingInterestId(0)
79 , m_isInReset(false)
80 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
81 , m_onUpdate(onUpdate)
82 , m_scheduler(m_face.getIoService())
83 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
84 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
85 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
86 , m_resetTimer(resetTimer)
87 , m_cancelResetTimer(cancelResetTimer)
88 , m_resetInterestLifetime(resetInterestLifetime)
89 , m_syncInterestLifetime(syncInterestLifetime)
90 , m_syncReplyFreshness(syncReplyFreshness)
Yingdi Yucd339022014-11-05 17:51:19 -080091 , m_signingId(signingId)
92 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070093{
94#ifdef _DEBUG
95 m_instanceId = m_instanceCounter++;
96#endif
97
98 _LOG_DEBUG_ID(">> Logic::Logic");
99
100 m_syncReset = m_syncPrefix;
101 m_syncReset.append("reset");
102
103 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
104 m_syncRegisteredPrefixId =
105 m_face.setInterestFilter(m_syncPrefix,
106 bind(&Logic::onSyncInterest, this, _1, _2),
107 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
108
109 setUserPrefix(m_userPrefix);
110
111 _LOG_DEBUG_ID("<< Logic::Logic");
112}
113
114Logic::~Logic()
115{
116 m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
117 m_scheduler.cancelAllEvents();
118}
119
120void
121Logic::reset()
122{
123 m_isInReset = true;
124
125 m_state.reset();
126 m_log.clear();
127
128 sendResetInterest();
129
130 // reset outstanding interest name, so that data for previous interest will be dropped.
131 if (m_outstandingInterestId != 0) {
132 m_face.removePendingInterest(m_outstandingInterestId);
133 m_outstandingInterestId = 0;
134 }
135
136 sendSyncInterest();
137
138 if (static_cast<bool>(m_delayedInterestProcessingId))
139 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
140
141 m_delayedInterestProcessingId =
142 m_scheduler.scheduleEvent(m_cancelResetTimer,
143 bind(&Logic::cancelReset, this));
144}
145
146void
147Logic::setUserPrefix(const Name& userPrefix)
148{
149 m_userPrefix = userPrefix;
150
151 m_sessionName = m_userPrefix;
152 m_sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
153
154 m_seqNo = 0;
155
156 reset();
157}
158
159void
160Logic::updateSeqNo(const SeqNo& seqNo)
161{
162 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
163 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << m_seqNo);
164 if (seqNo < m_seqNo || seqNo == 0)
165 return;
166
167 m_seqNo = seqNo;
168
169 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << m_seqNo);
170
171 if (!m_isInReset) {
172 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
173 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
174 {
175 using namespace CryptoPP;
176
177 std::string hash;
178 StringSource(previousRoot->buf(), previousRoot->size(), true,
179 new HexEncoder(new StringSink(hash), false));
180 _LOG_DEBUG_ID("Hash: " << hash);
181 }
182
183 bool isInserted = false;
184 bool isUpdated = false;
185 SeqNo oldSeq;
186 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(m_sessionName, seqNo);
187
188 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
189 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
190 if (isInserted || isUpdated) {
191 DiffStatePtr commit = make_shared<DiffState>();
192 commit->update(m_sessionName, seqNo);
193 commit->setRootDigest(m_state.getRootDigest());
194 insertToDiffLog(commit, previousRoot);
195
196 satisfyPendingSyncInterests(commit);
197 }
198 }
199}
200
201ConstBufferPtr
202Logic::getRootDigest() const
203{
204 return m_state.getRootDigest();
205}
206
207void
208Logic::printState(std::ostream& os) const
209{
210 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
211 {
212 os << *leaf << "\n";
213 }
214}
215
216std::set<Name>
217Logic::getSessionNames() const
218{
219 std::set<Name> sessionNames;
220
221 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
222 {
223 sessionNames.insert(leaf->getSessionName());
224 }
225
226 return sessionNames;
227}
228
229void
230Logic::onSyncInterest(const Name& prefix, const Interest& interest)
231{
232 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
233 Name name = interest.getName();
234
235 _LOG_DEBUG_ID("InterestName: " << name);
236
237 if (RESET_COMPONENT != name.get(-1)) {
238 // normal sync interest
239 processSyncInterest(interest.shared_from_this());
240 }
241 else
242 // reset interest
243 processResetInterest(interest);
244
245 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
246}
247
248void
249Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
250{
251 //Sync prefix registration failed
252 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
253}
254
255void
256Logic::onSyncData(const Interest& interest, Data& data)
257{
258 _LOG_DEBUG_ID(">> Logic::onSyncData");
Yingdi Yucd339022014-11-05 17:51:19 -0800259 if (static_cast<bool>(m_validator))
260 m_validator->validate(data,
261 bind(&Logic::onSyncDataValidated, this, _1),
262 bind(&Logic::onSyncDataValidationFailed, this, _1));
263 else
264 onSyncDataValidated(data.shared_from_this());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700265 _LOG_DEBUG_ID("<< Logic::onSyncData");
266}
267
268void
269Logic::onResetData(const Interest& interest, Data& data)
270{
271 // This should not happened, drop the received data.
272}
273
274void
275Logic::onSyncTimeout(const Interest& interest)
276{
277 // It is OK. Others will handle the time out situation.
278 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
279 _LOG_DEBUG_ID("Interest: " << interest.getName());
280 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
281}
282
283void
284Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
285{
286 // SyncReply cannot be validated.
287}
288
289void
290Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
291{
292 Name name = data->getName();
293 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
294
295 processSyncData(name, digest, data->getContent().blockFromValue());
296}
297
298void
299Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
300 bool isTimedProcessing/*=false*/)
301{
302 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
303
304 const Name& name = interest->getName();
305 ConstBufferPtr digest =
306 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
307
308 ConstBufferPtr rootDigest = m_state.getRootDigest();
309
310 // If the digest of the incoming interest is the same as root digest
311 // Put the interest into InterestTable
312 if (*rootDigest == *digest) {
313 _LOG_DEBUG_ID("Oh, we are in the same state");
314 m_interestTable.insert(interest, digest, false);
315
316 if (!m_isInReset)
317 return;
318
319 if (!isTimedProcessing) {
320 _LOG_DEBUG_ID("Non timed processing in reset");
321 // Still in reset, our own seq has not been put into state yet
322 // Do not hurry, some others may be also resetting and may send their reply
323 if (static_cast<bool>(m_delayedInterestProcessingId))
324 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
325
326 time::milliseconds after(m_rangeUniformRandom());
327 _LOG_DEBUG_ID("After: " << after);
328 m_delayedInterestProcessingId =
329 m_scheduler.scheduleEvent(after,
330 bind(&Logic::processSyncInterest, this, interest, true));
331 }
332 else {
333 _LOG_DEBUG_ID("Timed processing in reset");
334 // Now we can get out of reset state by putting our own stuff into m_state.
335 cancelReset();
336 }
337
338 return;
339 }
340
341 // If the digest of incoming interest is an "empty" digest
342 if (digest == EMPTY_DIGEST) {
343 _LOG_DEBUG_ID("Poor guy, he knows nothing");
344 sendSyncData(name, m_state);
345 return;
346 }
347
348 DiffStateContainer::iterator stateIter = m_log.find(digest);
349 // If the digest of incoming interest can be found from the log
350 if (stateIter != m_log.end()) {
351 _LOG_DEBUG_ID("It is ok, you are so close");
352 sendSyncData(name, *(*stateIter)->diff());
353 return;
354 }
355
356 if (!isTimedProcessing) {
357 _LOG_DEBUG_ID("Let's wait, just wait for a while");
358 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
359 bool doesExist = m_interestTable.insert(interest, digest, true);
360 if (doesExist)
361 // Original comment (not sure): somebody else replied, so restart random-game timer
362 // YY: Get the same SyncInterest again, refresh the timer
363 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
364
365 m_delayedInterestProcessingId =
366 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
367 bind(&Logic::processSyncInterest, this, interest, true));
368 }
369 else {
370 // OK, nobody is helping us, just tell the truth.
371 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
372 m_interestTable.erase(digest);
373 sendSyncData(name, m_state);
374 }
375
376 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
377}
378
379void
380Logic::processResetInterest(const Interest& interest)
381{
382 _LOG_DEBUG_ID(">> Logic::processResetInterest");
383 reset();
384}
385
386void
387Logic::processSyncData(const Name& name,
388 ndn::ConstBufferPtr digest,
389 const Block& syncReplyBlock)
390{
391 _LOG_DEBUG_ID(">> Logic::processSyncData");
392
393 DiffStatePtr commit = make_shared<DiffState>();
394 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
395
396 try {
397 m_interestTable.erase(digest); // Remove satisfied interest from PIT
398
399 State reply;
400 reply.wireDecode(syncReplyBlock);
401
402 std::vector<MissingDataInfo> v;
403 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
404 {
405 BOOST_ASSERT(leaf != 0);
406
407 const Name& info = leaf->getSessionName();
408 SeqNo seq = leaf->getSeq();
409
410 bool isInserted = false;
411 bool isUpdated = false;
412 SeqNo oldSeq;
413 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
414
415 if (isInserted || isUpdated) {
416 commit->update(info, seq);
417
418 oldSeq++;
419 MissingDataInfo mdi = {info, oldSeq, seq};
420 v.push_back(mdi);
421 }
422 }
423
424 if (!v.empty()) {
425 m_onUpdate(v);
426
427 commit->setRootDigest(m_state.getRootDigest());
428 insertToDiffLog(commit, previousRoot);
429 }
430 else {
431 _LOG_DEBUG_ID("What? nothing new");
432 }
433 }
434 catch (State::Error&) {
435 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
436 // Something really fishy happened during state decoding;
437 commit.reset();
438 return;
439 }
440
441 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
442 // state changed and it is safe to express a new interest
443 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
444 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
445 EventId eventId = m_scheduler.scheduleEvent(after,
446 bind(&Logic::sendSyncInterest, this));
447
448 m_scheduler.cancelEvent(m_reexpressingInterestId);
449 m_reexpressingInterestId = eventId;
450 }
451}
452
453void
454Logic::satisfyPendingSyncInterests(ConstDiffStatePtr commit)
455{
456 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
457 try {
458 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
459 for (InterestTable::const_iterator it = m_interestTable.begin();
460 it != m_interestTable.end(); it++) {
461 ConstUnsatisfiedInterestPtr request = *it;
462
463 if (request->isUnknown)
464 sendSyncData(request->interest->getName(), m_state);
465 else
466 sendSyncData(request->interest->getName(), *commit);
467 }
468 m_interestTable.clear();
469 }
470 catch (InterestTable::Error&) {
471 // ok. not really an error
472 }
473 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
474}
475
476void
477Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
478{
479 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
480 // Connect to the history
481 if (!m_log.empty())
482 (*m_log.find(previousRoot))->setNext(commit);
483
484 // Insert the commit
485 m_log.erase(commit->getRootDigest());
486 m_log.insert(commit);
487 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
488}
489
490void
491Logic::sendResetInterest()
492{
493 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
494
495 if (m_needPeriodReset) {
496 _LOG_DEBUG_ID("Need Period Reset");
497 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
498
499 EventId eventId =
500 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
501 bind(&Logic::sendResetInterest, this));
502 m_scheduler.cancelEvent(m_resetInterestId);
503 m_resetInterestId = eventId;
504 }
505
506 Interest interest(m_syncReset);
507 interest.setMustBeFresh(true);
508 interest.setInterestLifetime(m_resetInterestLifetime);
509 m_face.expressInterest(interest,
510 bind(&Logic::onResetData, this, _1, _2),
511 bind(&Logic::onSyncTimeout, this, _1));
512
513 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
514}
515
516void
517Logic::sendSyncInterest()
518{
519 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
520
521 Name interestName;
522 interestName.append(m_syncPrefix)
523 .append(ndn::name::Component(*m_state.getRootDigest()));
524
525 m_outstandingInterestName = interestName;
526
527#ifdef _DEBUG
528 printDigest(m_state.getRootDigest());
529#endif
530
531 EventId eventId =
532 m_scheduler.scheduleEvent(m_syncInterestLifetime +
533 ndn::time::milliseconds(m_reexpressionJitter()),
534 bind(&Logic::sendSyncInterest, this));
535 m_scheduler.cancelEvent(m_reexpressingInterestId);
536 m_reexpressingInterestId = eventId;
537
538 Interest interest(interestName);
539 interest.setMustBeFresh(true);
540 interest.setInterestLifetime(m_syncInterestLifetime);
541
542 m_outstandingInterestId = m_face.expressInterest(interest,
543 bind(&Logic::onSyncData, this, _1, _2),
544 bind(&Logic::onSyncTimeout, this, _1));
545
546 _LOG_DEBUG_ID("Send interest: " << interest.getName());
547 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
548}
549
550void
551Logic::sendSyncData(const Name& name, const State& state)
552{
553 _LOG_DEBUG_ID(">> Logic::sendSyncData");
554 shared_ptr<Data> syncReply = make_shared<Data>(name);
555 syncReply->setContent(state.wireEncode());
556 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
Yingdi Yucd339022014-11-05 17:51:19 -0800557
558 if (m_signingId.empty())
559 m_keyChain.sign(*syncReply);
560 else
561 m_keyChain.signByIdentity(*syncReply, m_signingId);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700562
563 m_face.put(*syncReply);
564
565 // checking if our own interest got satisfied
566 if (m_outstandingInterestName == name) {
567 // remove outstanding interest
568 if (m_outstandingInterestId != 0) {
569 m_face.removePendingInterest(m_outstandingInterestId);
570 m_outstandingInterestId = 0;
571 }
572
573 // re-schedule sending Sync interest
574 time::milliseconds after(m_reexpressionJitter());
575 _LOG_DEBUG_ID("Satisfy our own interest");
576 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
577 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
578 m_scheduler.cancelEvent(m_reexpressingInterestId);
579 m_reexpressingInterestId = eventId;
580 }
581 _LOG_DEBUG_ID("<< Logic::sendSyncData");
582}
583
584void
585Logic::cancelReset()
586{
587 _LOG_DEBUG_ID(">> Logic::cancelReset");
588 if (!m_isInReset)
589 return;
590
591 m_isInReset = false;
592 updateSeqNo(m_seqNo);
593 _LOG_DEBUG_ID("<< Logic::cancelReset");
594}
595
596void
597Logic::printDigest(ndn::ConstBufferPtr digest)
598{
599 using namespace CryptoPP;
600
601 std::string hash;
602 StringSource(digest->buf(), digest->size(), true,
603 new HexEncoder(new StringSink(hash), false));
604 _LOG_DEBUG_ID("Hash: " << hash);
605}
606
607} // namespace chronosync