blob: f584da5b1e5d6ff92236336bb2a7cb757384571e [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2012-2014 University of California, Los Angeles
4 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
23 */
24
25#include "logic.hpp"
26#include "logger.hpp"
27
28INIT_LOGGER("Logic");
29
30#ifdef _DEBUG
31#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
32#else
33#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
34#endif
35
36namespace chronosync {
37
38using ndn::ConstBufferPtr;
39using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
48#ifdef _DEBUG
49int Logic::m_instanceCounter = 0;
50#endif
51
52const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
53const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
54const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
55const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
56const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
57
58const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
59const ndn::name::Component Logic::RESET_COMPONENT("reset");
60
61Logic::Logic(ndn::Face& face,
62 const Name& syncPrefix,
63 const Name& userPrefix,
64 const UpdateCallback& onUpdate,
65 const time::steady_clock::Duration& resetTimer,
66 const time::steady_clock::Duration& cancelResetTimer,
67 const time::milliseconds& resetInterestLifetime,
68 const time::milliseconds& syncInterestLifetime,
69 const time::milliseconds& syncReplyFreshness)
70 : m_face(face)
71 , m_syncPrefix(syncPrefix)
72 , m_userPrefix(userPrefix)
73 , m_interestTable(m_face.getIoService())
74 , m_outstandingInterestId(0)
75 , m_isInReset(false)
76 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
77 , m_onUpdate(onUpdate)
78 , m_scheduler(m_face.getIoService())
79 , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
80 , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
81 , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
82 , m_resetTimer(resetTimer)
83 , m_cancelResetTimer(cancelResetTimer)
84 , m_resetInterestLifetime(resetInterestLifetime)
85 , m_syncInterestLifetime(syncInterestLifetime)
86 , m_syncReplyFreshness(syncReplyFreshness)
87{
88#ifdef _DEBUG
89 m_instanceId = m_instanceCounter++;
90#endif
91
92 _LOG_DEBUG_ID(">> Logic::Logic");
93
94 m_syncReset = m_syncPrefix;
95 m_syncReset.append("reset");
96
97 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
98 m_syncRegisteredPrefixId =
99 m_face.setInterestFilter(m_syncPrefix,
100 bind(&Logic::onSyncInterest, this, _1, _2),
101 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
102
103 setUserPrefix(m_userPrefix);
104
105 _LOG_DEBUG_ID("<< Logic::Logic");
106}
107
108Logic::~Logic()
109{
110 m_face.unsetInterestFilter(m_syncRegisteredPrefixId);
111 m_scheduler.cancelAllEvents();
112}
113
114void
115Logic::reset()
116{
117 m_isInReset = true;
118
119 m_state.reset();
120 m_log.clear();
121
122 sendResetInterest();
123
124 // reset outstanding interest name, so that data for previous interest will be dropped.
125 if (m_outstandingInterestId != 0) {
126 m_face.removePendingInterest(m_outstandingInterestId);
127 m_outstandingInterestId = 0;
128 }
129
130 sendSyncInterest();
131
132 if (static_cast<bool>(m_delayedInterestProcessingId))
133 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
134
135 m_delayedInterestProcessingId =
136 m_scheduler.scheduleEvent(m_cancelResetTimer,
137 bind(&Logic::cancelReset, this));
138}
139
140void
141Logic::setUserPrefix(const Name& userPrefix)
142{
143 m_userPrefix = userPrefix;
144
145 m_sessionName = m_userPrefix;
146 m_sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
147
148 m_seqNo = 0;
149
150 reset();
151}
152
153void
154Logic::updateSeqNo(const SeqNo& seqNo)
155{
156 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
157 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << m_seqNo);
158 if (seqNo < m_seqNo || seqNo == 0)
159 return;
160
161 m_seqNo = seqNo;
162
163 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << m_seqNo);
164
165 if (!m_isInReset) {
166 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
167 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
168 {
169 using namespace CryptoPP;
170
171 std::string hash;
172 StringSource(previousRoot->buf(), previousRoot->size(), true,
173 new HexEncoder(new StringSink(hash), false));
174 _LOG_DEBUG_ID("Hash: " << hash);
175 }
176
177 bool isInserted = false;
178 bool isUpdated = false;
179 SeqNo oldSeq;
180 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(m_sessionName, seqNo);
181
182 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
183 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
184 if (isInserted || isUpdated) {
185 DiffStatePtr commit = make_shared<DiffState>();
186 commit->update(m_sessionName, seqNo);
187 commit->setRootDigest(m_state.getRootDigest());
188 insertToDiffLog(commit, previousRoot);
189
190 satisfyPendingSyncInterests(commit);
191 }
192 }
193}
194
195ConstBufferPtr
196Logic::getRootDigest() const
197{
198 return m_state.getRootDigest();
199}
200
201void
202Logic::printState(std::ostream& os) const
203{
204 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
205 {
206 os << *leaf << "\n";
207 }
208}
209
210std::set<Name>
211Logic::getSessionNames() const
212{
213 std::set<Name> sessionNames;
214
215 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
216 {
217 sessionNames.insert(leaf->getSessionName());
218 }
219
220 return sessionNames;
221}
222
223void
224Logic::onSyncInterest(const Name& prefix, const Interest& interest)
225{
226 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
227 Name name = interest.getName();
228
229 _LOG_DEBUG_ID("InterestName: " << name);
230
231 if (RESET_COMPONENT != name.get(-1)) {
232 // normal sync interest
233 processSyncInterest(interest.shared_from_this());
234 }
235 else
236 // reset interest
237 processResetInterest(interest);
238
239 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
240}
241
242void
243Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
244{
245 //Sync prefix registration failed
246 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
247}
248
249void
250Logic::onSyncData(const Interest& interest, Data& data)
251{
252 _LOG_DEBUG_ID(">> Logic::onSyncData");
253 // Place holder for validator.
254 onSyncDataValidated(data.shared_from_this());
255 _LOG_DEBUG_ID("<< Logic::onSyncData");
256}
257
258void
259Logic::onResetData(const Interest& interest, Data& data)
260{
261 // This should not happened, drop the received data.
262}
263
264void
265Logic::onSyncTimeout(const Interest& interest)
266{
267 // It is OK. Others will handle the time out situation.
268 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
269 _LOG_DEBUG_ID("Interest: " << interest.getName());
270 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
271}
272
273void
274Logic::onSyncDataValidationFailed(const shared_ptr<const Data>& data)
275{
276 // SyncReply cannot be validated.
277}
278
279void
280Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
281{
282 Name name = data->getName();
283 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
284
285 processSyncData(name, digest, data->getContent().blockFromValue());
286}
287
288void
289Logic::processSyncInterest(const shared_ptr<const Interest>& interest,
290 bool isTimedProcessing/*=false*/)
291{
292 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
293
294 const Name& name = interest->getName();
295 ConstBufferPtr digest =
296 make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
297
298 ConstBufferPtr rootDigest = m_state.getRootDigest();
299
300 // If the digest of the incoming interest is the same as root digest
301 // Put the interest into InterestTable
302 if (*rootDigest == *digest) {
303 _LOG_DEBUG_ID("Oh, we are in the same state");
304 m_interestTable.insert(interest, digest, false);
305
306 if (!m_isInReset)
307 return;
308
309 if (!isTimedProcessing) {
310 _LOG_DEBUG_ID("Non timed processing in reset");
311 // Still in reset, our own seq has not been put into state yet
312 // Do not hurry, some others may be also resetting and may send their reply
313 if (static_cast<bool>(m_delayedInterestProcessingId))
314 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
315
316 time::milliseconds after(m_rangeUniformRandom());
317 _LOG_DEBUG_ID("After: " << after);
318 m_delayedInterestProcessingId =
319 m_scheduler.scheduleEvent(after,
320 bind(&Logic::processSyncInterest, this, interest, true));
321 }
322 else {
323 _LOG_DEBUG_ID("Timed processing in reset");
324 // Now we can get out of reset state by putting our own stuff into m_state.
325 cancelReset();
326 }
327
328 return;
329 }
330
331 // If the digest of incoming interest is an "empty" digest
332 if (digest == EMPTY_DIGEST) {
333 _LOG_DEBUG_ID("Poor guy, he knows nothing");
334 sendSyncData(name, m_state);
335 return;
336 }
337
338 DiffStateContainer::iterator stateIter = m_log.find(digest);
339 // If the digest of incoming interest can be found from the log
340 if (stateIter != m_log.end()) {
341 _LOG_DEBUG_ID("It is ok, you are so close");
342 sendSyncData(name, *(*stateIter)->diff());
343 return;
344 }
345
346 if (!isTimedProcessing) {
347 _LOG_DEBUG_ID("Let's wait, just wait for a while");
348 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
349 bool doesExist = m_interestTable.insert(interest, digest, true);
350 if (doesExist)
351 // Original comment (not sure): somebody else replied, so restart random-game timer
352 // YY: Get the same SyncInterest again, refresh the timer
353 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
354
355 m_delayedInterestProcessingId =
356 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
357 bind(&Logic::processSyncInterest, this, interest, true));
358 }
359 else {
360 // OK, nobody is helping us, just tell the truth.
361 _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
362 m_interestTable.erase(digest);
363 sendSyncData(name, m_state);
364 }
365
366 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
367}
368
369void
370Logic::processResetInterest(const Interest& interest)
371{
372 _LOG_DEBUG_ID(">> Logic::processResetInterest");
373 reset();
374}
375
376void
377Logic::processSyncData(const Name& name,
378 ndn::ConstBufferPtr digest,
379 const Block& syncReplyBlock)
380{
381 _LOG_DEBUG_ID(">> Logic::processSyncData");
382
383 DiffStatePtr commit = make_shared<DiffState>();
384 ndn::ConstBufferPtr previousRoot = m_state.getRootDigest();
385
386 try {
387 m_interestTable.erase(digest); // Remove satisfied interest from PIT
388
389 State reply;
390 reply.wireDecode(syncReplyBlock);
391
392 std::vector<MissingDataInfo> v;
393 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
394 {
395 BOOST_ASSERT(leaf != 0);
396
397 const Name& info = leaf->getSessionName();
398 SeqNo seq = leaf->getSeq();
399
400 bool isInserted = false;
401 bool isUpdated = false;
402 SeqNo oldSeq;
403 boost::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
404
405 if (isInserted || isUpdated) {
406 commit->update(info, seq);
407
408 oldSeq++;
409 MissingDataInfo mdi = {info, oldSeq, seq};
410 v.push_back(mdi);
411 }
412 }
413
414 if (!v.empty()) {
415 m_onUpdate(v);
416
417 commit->setRootDigest(m_state.getRootDigest());
418 insertToDiffLog(commit, previousRoot);
419 }
420 else {
421 _LOG_DEBUG_ID("What? nothing new");
422 }
423 }
424 catch (State::Error&) {
425 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
426 // Something really fishy happened during state decoding;
427 commit.reset();
428 return;
429 }
430
431 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
432 // state changed and it is safe to express a new interest
433 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
434 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
435 EventId eventId = m_scheduler.scheduleEvent(after,
436 bind(&Logic::sendSyncInterest, this));
437
438 m_scheduler.cancelEvent(m_reexpressingInterestId);
439 m_reexpressingInterestId = eventId;
440 }
441}
442
443void
444Logic::satisfyPendingSyncInterests(ConstDiffStatePtr commit)
445{
446 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
447 try {
448 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
449 for (InterestTable::const_iterator it = m_interestTable.begin();
450 it != m_interestTable.end(); it++) {
451 ConstUnsatisfiedInterestPtr request = *it;
452
453 if (request->isUnknown)
454 sendSyncData(request->interest->getName(), m_state);
455 else
456 sendSyncData(request->interest->getName(), *commit);
457 }
458 m_interestTable.clear();
459 }
460 catch (InterestTable::Error&) {
461 // ok. not really an error
462 }
463 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
464}
465
466void
467Logic::insertToDiffLog(DiffStatePtr commit, ndn::ConstBufferPtr previousRoot)
468{
469 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
470 // Connect to the history
471 if (!m_log.empty())
472 (*m_log.find(previousRoot))->setNext(commit);
473
474 // Insert the commit
475 m_log.erase(commit->getRootDigest());
476 m_log.insert(commit);
477 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
478}
479
480void
481Logic::sendResetInterest()
482{
483 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
484
485 if (m_needPeriodReset) {
486 _LOG_DEBUG_ID("Need Period Reset");
487 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
488
489 EventId eventId =
490 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
491 bind(&Logic::sendResetInterest, this));
492 m_scheduler.cancelEvent(m_resetInterestId);
493 m_resetInterestId = eventId;
494 }
495
496 Interest interest(m_syncReset);
497 interest.setMustBeFresh(true);
498 interest.setInterestLifetime(m_resetInterestLifetime);
499 m_face.expressInterest(interest,
500 bind(&Logic::onResetData, this, _1, _2),
501 bind(&Logic::onSyncTimeout, this, _1));
502
503 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
504}
505
506void
507Logic::sendSyncInterest()
508{
509 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
510
511 Name interestName;
512 interestName.append(m_syncPrefix)
513 .append(ndn::name::Component(*m_state.getRootDigest()));
514
515 m_outstandingInterestName = interestName;
516
517#ifdef _DEBUG
518 printDigest(m_state.getRootDigest());
519#endif
520
521 EventId eventId =
522 m_scheduler.scheduleEvent(m_syncInterestLifetime +
523 ndn::time::milliseconds(m_reexpressionJitter()),
524 bind(&Logic::sendSyncInterest, this));
525 m_scheduler.cancelEvent(m_reexpressingInterestId);
526 m_reexpressingInterestId = eventId;
527
528 Interest interest(interestName);
529 interest.setMustBeFresh(true);
530 interest.setInterestLifetime(m_syncInterestLifetime);
531
532 m_outstandingInterestId = m_face.expressInterest(interest,
533 bind(&Logic::onSyncData, this, _1, _2),
534 bind(&Logic::onSyncTimeout, this, _1));
535
536 _LOG_DEBUG_ID("Send interest: " << interest.getName());
537 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
538}
539
540void
541Logic::sendSyncData(const Name& name, const State& state)
542{
543 _LOG_DEBUG_ID(">> Logic::sendSyncData");
544 shared_ptr<Data> syncReply = make_shared<Data>(name);
545 syncReply->setContent(state.wireEncode());
546 syncReply->setFreshnessPeriod(m_syncReplyFreshness);
547 m_keyChain.sign(*syncReply);
548
549 m_face.put(*syncReply);
550
551 // checking if our own interest got satisfied
552 if (m_outstandingInterestName == name) {
553 // remove outstanding interest
554 if (m_outstandingInterestId != 0) {
555 m_face.removePendingInterest(m_outstandingInterestId);
556 m_outstandingInterestId = 0;
557 }
558
559 // re-schedule sending Sync interest
560 time::milliseconds after(m_reexpressionJitter());
561 _LOG_DEBUG_ID("Satisfy our own interest");
562 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
563 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
564 m_scheduler.cancelEvent(m_reexpressingInterestId);
565 m_reexpressingInterestId = eventId;
566 }
567 _LOG_DEBUG_ID("<< Logic::sendSyncData");
568}
569
570void
571Logic::cancelReset()
572{
573 _LOG_DEBUG_ID(">> Logic::cancelReset");
574 if (!m_isInReset)
575 return;
576
577 m_isInReset = false;
578 updateSeqNo(m_seqNo);
579 _LOG_DEBUG_ID("<< Logic::cancelReset");
580}
581
582void
583Logic::printDigest(ndn::ConstBufferPtr digest)
584{
585 using namespace CryptoPP;
586
587 std::string hash;
588 StringSource(digest->buf(), digest->size(), true,
589 new HexEncoder(new StringSink(hash), false));
590 _LOG_DEBUG_ID("Hash: " << hash);
591}
592
593} // namespace chronosync