blob: 84cf5bee3e9908025fb1a82cbc6bb34f900a5261 [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Ashlesh Gawande04e8d492018-02-04 13:08:15 -06003 * Copyright (c) 2012-2018 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
28
Ashlesh Gawande687cf922017-05-30 15:04:16 -050029#include <ndn-cxx/util/string-helper.hpp>
30
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080031INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070032
Yingdi Yuf7ede412014-08-30 20:37:52 -070033#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
Yingdi Yuf7ede412014-08-30 20:37:52 -070034
35namespace chronosync {
36
Yingdi Yuf7ede412014-08-30 20:37:52 -070037using ndn::EventId;
38
39const uint8_t EMPTY_DIGEST_VALUE[] = {
40 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
41 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
42 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
43 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
44};
45
Ashlesh Gawande08784d42017-09-06 23:40:21 -050046int Logic::s_instanceCounter = 0;
Yingdi Yuf7ede412014-08-30 20:37:52 -070047
Yingdi Yucd339022014-11-05 17:51:19 -080048const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080049const ndn::Name Logic::EMPTY_NAME;
Ashlesh Gawande08784d42017-09-06 23:40:21 -050050const std::shared_ptr<Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070051const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
52const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
53const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
54const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
55const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080056const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
Yingdi Yuf7ede412014-08-30 20:37:52 -070057
Ashlesh Gawande08784d42017-09-06 23:40:21 -050058const ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
Yingdi Yuf7ede412014-08-30 20:37:52 -070059const ndn::name::Component Logic::RESET_COMPONENT("reset");
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080060const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Yingdi Yuf7ede412014-08-30 20:37:52 -070061
62Logic::Logic(ndn::Face& face,
63 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080064 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070065 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080066 const Name& defaultSigningId,
Ashlesh Gawande08784d42017-09-06 23:40:21 -050067 std::shared_ptr<Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070068 const time::steady_clock::Duration& resetTimer,
69 const time::steady_clock::Duration& cancelResetTimer,
70 const time::milliseconds& resetInterestLifetime,
71 const time::milliseconds& syncInterestLifetime,
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080072 const time::milliseconds& syncReplyFreshness,
73 const time::milliseconds& recoveryInterestLifetime)
Yingdi Yuf7ede412014-08-30 20:37:52 -070074 : m_face(face)
75 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080076 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070077 , m_interestTable(m_face.getIoService())
78 , m_outstandingInterestId(0)
79 , m_isInReset(false)
80 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
81 , m_onUpdate(onUpdate)
82 , m_scheduler(m_face.getIoService())
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -060083 , m_rng(std::random_device{}())
84 , m_rangeUniformRandom(100, 500)
85 , m_reexpressionJitter(100, 500)
Yingdi Yuf7ede412014-08-30 20:37:52 -070086 , m_resetTimer(resetTimer)
87 , m_cancelResetTimer(cancelResetTimer)
88 , m_resetInterestLifetime(resetInterestLifetime)
89 , m_syncInterestLifetime(syncInterestLifetime)
90 , m_syncReplyFreshness(syncReplyFreshness)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080091 , m_recoveryInterestLifetime(recoveryInterestLifetime)
Yingdi Yucd339022014-11-05 17:51:19 -080092 , m_validator(validator)
Alexander Afanasyev90587b82018-02-11 20:36:53 -050093 , m_instanceId(s_instanceCounter++)
Yingdi Yuf7ede412014-08-30 20:37:52 -070094{
Yingdi Yuf7ede412014-08-30 20:37:52 -070095 _LOG_DEBUG_ID(">> Logic::Logic");
96
Ashlesh Gawande687cf922017-05-30 15:04:16 -050097 addUserNode(m_defaultUserPrefix, defaultSigningId);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080098
Yingdi Yuf7ede412014-08-30 20:37:52 -070099 m_syncReset = m_syncPrefix;
100 m_syncReset.append("reset");
101
102 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
103 m_syncRegisteredPrefixId =
Junxiao Shi4e010bc2017-11-16 14:11:00 +0000104 m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700105 bind(&Logic::onSyncInterest, this, _1, _2),
106 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
107
Qiuhan Dinge246b622014-12-03 21:57:48 -0800108 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700109 _LOG_DEBUG_ID("<< Logic::Logic");
110}
111
112Logic::~Logic()
113{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700114 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800115 m_interestTable.clear();
116 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700117}
118
119void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800120Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700121{
122 m_isInReset = true;
123
124 m_state.reset();
125 m_log.clear();
126
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800127 if (!isOnInterest)
128 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700129
130 // reset outstanding interest name, so that data for previous interest will be dropped.
131 if (m_outstandingInterestId != 0) {
132 m_face.removePendingInterest(m_outstandingInterestId);
133 m_outstandingInterestId = 0;
134 }
135
136 sendSyncInterest();
137
138 if (static_cast<bool>(m_delayedInterestProcessingId))
139 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
140
141 m_delayedInterestProcessingId =
142 m_scheduler.scheduleEvent(m_cancelResetTimer,
143 bind(&Logic::cancelReset, this));
144}
145
146void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800147Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700148{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800149 if (defaultUserPrefix != EMPTY_NAME) {
150 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
151 m_defaultUserPrefix = defaultUserPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800152 }
153 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700154}
155
156void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800157Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700158{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800159 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700160 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800161 if (m_defaultUserPrefix == EMPTY_NAME) {
162 m_defaultUserPrefix = userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800163 }
164 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
165 m_nodeList[userPrefix].userPrefix = userPrefix;
166 m_nodeList[userPrefix].signingId = signingId;
167 Name sessionName = userPrefix;
168 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
169 m_nodeList[userPrefix].sessionName = sessionName;
170 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800171 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800172 }
173}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700174
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800175void
176Logic::removeUserNode(const Name& userPrefix)
177{
178 auto userNode = m_nodeList.find(userPrefix);
179 if (userNode != m_nodeList.end()) {
180 m_nodeList.erase(userNode);
181 if (m_defaultUserPrefix == userPrefix) {
182 if (!m_nodeList.empty()) {
183 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800184 }
185 else {
186 m_defaultUserPrefix = EMPTY_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800187 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700188 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800189 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800190 }
191}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700192
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800193const Name&
194Logic::getSessionName(Name prefix)
195{
196 if (prefix == EMPTY_NAME)
197 prefix = m_defaultUserPrefix;
198 auto node = m_nodeList.find(prefix);
199 if (node != m_nodeList.end())
200 return node->second.sessionName;
201 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800202 BOOST_THROW_EXCEPTION(Error("Refer to non-existent node:" + prefix.toUri()));
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800203}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700204
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800205const SeqNo&
206Logic::getSeqNo(Name prefix)
207{
208 if (prefix == EMPTY_NAME)
209 prefix = m_defaultUserPrefix;
210 auto node = m_nodeList.find(prefix);
211 if (node != m_nodeList.end())
212 return node->second.seqNo;
213 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800214 BOOST_THROW_EXCEPTION(Logic::Error("Refer to non-existent node:" + prefix.toUri()));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700215
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800216}
217
218void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800219Logic::updateSeqNo(const SeqNo& seqNo, const Name& updatePrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800220{
221 Name prefix;
222 if (updatePrefix == EMPTY_NAME) {
223 if (m_defaultUserPrefix == EMPTY_NAME)
224 return;
225 prefix = m_defaultUserPrefix;
226 }
227 else
228 prefix = updatePrefix;
229
230 auto it = m_nodeList.find(prefix);
231 if (it != m_nodeList.end()) {
232 NodeInfo& node = it->second;
233 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
234 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
235 if (seqNo < node.seqNo || seqNo == 0)
236 return;
237
238 node.seqNo = seqNo;
239 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
240
241 if (!m_isInReset) {
242 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500243 ConstBufferPtr previousRoot = m_state.getRootDigest();
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800244 {
Davide Pesavento5473abe2017-10-09 01:35:33 -0400245 std::string hash = ndn::toHex(previousRoot->data(), previousRoot->size(), false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800246 _LOG_DEBUG_ID("Hash: " << hash);
247 }
248
249 bool isInserted = false;
250 bool isUpdated = false;
251 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500252 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName, node.seqNo);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800253
254 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
255 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
256 if (isInserted || isUpdated) {
257 DiffStatePtr commit = make_shared<DiffState>();
258 commit->update(node.sessionName, node.seqNo);
259 commit->setRootDigest(m_state.getRootDigest());
260 insertToDiffLog(commit, previousRoot);
261
262 satisfyPendingSyncInterests(prefix, commit);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800263 formAndSendExcludeInterest(prefix, *commit, previousRoot);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800264 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700265 }
266 }
267}
268
269ConstBufferPtr
270Logic::getRootDigest() const
271{
272 return m_state.getRootDigest();
273}
274
275void
276Logic::printState(std::ostream& os) const
277{
278 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
279 {
280 os << *leaf << "\n";
281 }
282}
283
284std::set<Name>
285Logic::getSessionNames() const
286{
287 std::set<Name> sessionNames;
288
289 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
290 {
291 sessionNames.insert(leaf->getSessionName());
292 }
293
294 return sessionNames;
295}
296
297void
298Logic::onSyncInterest(const Name& prefix, const Interest& interest)
299{
300 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
301 Name name = interest.getName();
302
303 _LOG_DEBUG_ID("InterestName: " << name);
304
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800305 if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
306 processResetInterest(interest);
307 }
308 else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
309 processRecoveryInterest(interest);
310 }
Ashlesh Gawande8ba7d5a2017-07-24 14:43:12 -0500311 // Do not process exclude interests, they should be answered by CS
312 else if (interest.getExclude().empty()) {
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500313 processSyncInterest(interest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700314 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700315
316 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
317}
318
319void
320Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
321{
322 //Sync prefix registration failed
323 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
324}
325
326void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800327Logic::onSyncData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700328{
329 _LOG_DEBUG_ID(">> Logic::onSyncData");
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800330 // if (static_cast<bool>(m_validator))
331 // m_validator->validate(data,
332 // bind(&Logic::onSyncDataValidated, this, _1),
333 // bind(&Logic::onSyncDataValidationFailed, this, _1));
334 // else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500335 // onSyncDataValidated(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800336
337 if (interest.getExclude().empty()) {
338 _LOG_DEBUG_ID("First data");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500339 onSyncDataValidated(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800340 }
341 else {
342 _LOG_DEBUG_ID("Data obtained using exclude filter");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500343 onSyncDataValidated(data, false);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800344 }
345 sendExcludeInterest(interest, data);
346
Yingdi Yuf7ede412014-08-30 20:37:52 -0700347 _LOG_DEBUG_ID("<< Logic::onSyncData");
348}
349
350void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800351Logic::onResetData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700352{
353 // This should not happened, drop the received data.
354}
355
356void
357Logic::onSyncTimeout(const Interest& interest)
358{
359 // It is OK. Others will handle the time out situation.
360 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
361 _LOG_DEBUG_ID("Interest: " << interest.getName());
362 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
363}
364
365void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500366Logic::onSyncDataValidationFailed(const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700367{
368 // SyncReply cannot be validated.
369}
370
371void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500372Logic::onSyncDataValidated(const Data& data, bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700373{
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500374 Name name = data.getName();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700375 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
376
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500377 processSyncData(name, digest, data.getContent().blockFromValue(), firstData);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700378}
379
380void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500381Logic::processSyncInterest(const Interest& interest, bool isTimedProcessing/*=false*/)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700382{
383 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
384
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500385 Name name = interest.getName();
386 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700387
388 ConstBufferPtr rootDigest = m_state.getRootDigest();
389
390 // If the digest of the incoming interest is the same as root digest
391 // Put the interest into InterestTable
392 if (*rootDigest == *digest) {
393 _LOG_DEBUG_ID("Oh, we are in the same state");
394 m_interestTable.insert(interest, digest, false);
395
396 if (!m_isInReset)
397 return;
398
399 if (!isTimedProcessing) {
400 _LOG_DEBUG_ID("Non timed processing in reset");
401 // Still in reset, our own seq has not been put into state yet
402 // Do not hurry, some others may be also resetting and may send their reply
403 if (static_cast<bool>(m_delayedInterestProcessingId))
404 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
405
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600406 time::milliseconds after(m_rangeUniformRandom(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700407 _LOG_DEBUG_ID("After: " << after);
408 m_delayedInterestProcessingId =
409 m_scheduler.scheduleEvent(after,
410 bind(&Logic::processSyncInterest, this, interest, true));
411 }
412 else {
413 _LOG_DEBUG_ID("Timed processing in reset");
414 // Now we can get out of reset state by putting our own stuff into m_state.
415 cancelReset();
416 }
417
418 return;
419 }
420
421 // If the digest of incoming interest is an "empty" digest
Sonu Mishrae10acbc2017-01-18 14:14:05 -0800422 if (*digest == *EMPTY_DIGEST) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700423 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800424 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700425 return;
426 }
427
428 DiffStateContainer::iterator stateIter = m_log.find(digest);
429 // If the digest of incoming interest can be found from the log
430 if (stateIter != m_log.end()) {
431 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800432 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700433 return;
434 }
435
436 if (!isTimedProcessing) {
437 _LOG_DEBUG_ID("Let's wait, just wait for a while");
438 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800439 bool doesExist = m_interestTable.has(digest);
440 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700441 if (doesExist)
442 // Original comment (not sure): somebody else replied, so restart random-game timer
443 // YY: Get the same SyncInterest again, refresh the timer
444 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
445
446 m_delayedInterestProcessingId =
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600447 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700448 bind(&Logic::processSyncInterest, this, interest, true));
449 }
450 else {
451 // OK, nobody is helping us, just tell the truth.
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800452 _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700453 m_interestTable.erase(digest);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800454 sendRecoveryInterest(digest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700455 }
456
457 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
458}
459
460void
461Logic::processResetInterest(const Interest& interest)
462{
463 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800464 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700465}
466
467void
468Logic::processSyncData(const Name& name,
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500469 ConstBufferPtr digest,
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800470 const Block& syncReplyBlock,
471 bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700472{
473 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700474 DiffStatePtr commit = make_shared<DiffState>();
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500475 ConstBufferPtr previousRoot = m_state.getRootDigest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700476
477 try {
478 m_interestTable.erase(digest); // Remove satisfied interest from PIT
479
480 State reply;
481 reply.wireDecode(syncReplyBlock);
482
483 std::vector<MissingDataInfo> v;
484 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
485 {
486 BOOST_ASSERT(leaf != 0);
487
488 const Name& info = leaf->getSessionName();
489 SeqNo seq = leaf->getSeq();
490
491 bool isInserted = false;
492 bool isUpdated = false;
493 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500494 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700495 if (isInserted || isUpdated) {
496 commit->update(info, seq);
497
498 oldSeq++;
499 MissingDataInfo mdi = {info, oldSeq, seq};
500 v.push_back(mdi);
501 }
502 }
503
504 if (!v.empty()) {
505 m_onUpdate(v);
506
507 commit->setRootDigest(m_state.getRootDigest());
508 insertToDiffLog(commit, previousRoot);
509 }
510 else {
511 _LOG_DEBUG_ID("What? nothing new");
512 }
513 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800514 catch (const State::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700515 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
516 // Something really fishy happened during state decoding;
517 commit.reset();
518 return;
519 }
520
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800521 if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700522 // state changed and it is safe to express a new interest
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600523 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700524 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
525 EventId eventId = m_scheduler.scheduleEvent(after,
526 bind(&Logic::sendSyncInterest, this));
527
528 m_scheduler.cancelEvent(m_reexpressingInterestId);
529 m_reexpressingInterestId = eventId;
530 }
531}
532
533void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800534Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700535{
536 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
537 try {
538 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500539 auto it = m_interestTable.begin();
540 while (it != m_interestTable.end()) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700541 ConstUnsatisfiedInterestPtr request = *it;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500542 ++it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700543 if (request->isUnknown)
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500544 sendSyncData(updatedPrefix, request->interest.getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700545 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500546 sendSyncData(updatedPrefix, request->interest.getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700547 }
548 m_interestTable.clear();
549 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800550 catch (const InterestTable::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700551 // ok. not really an error
552 }
553 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
554}
555
556void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500557Logic::insertToDiffLog(DiffStatePtr commit, ConstBufferPtr previousRoot)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700558{
559 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
560 // Connect to the history
561 if (!m_log.empty())
562 (*m_log.find(previousRoot))->setNext(commit);
563
564 // Insert the commit
565 m_log.erase(commit->getRootDigest());
566 m_log.insert(commit);
567 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
568}
569
570void
571Logic::sendResetInterest()
572{
573 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
574
575 if (m_needPeriodReset) {
576 _LOG_DEBUG_ID("Need Period Reset");
577 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
578
579 EventId eventId =
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600580 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700581 bind(&Logic::sendResetInterest, this));
582 m_scheduler.cancelEvent(m_resetInterestId);
583 m_resetInterestId = eventId;
584 }
585
586 Interest interest(m_syncReset);
587 interest.setMustBeFresh(true);
588 interest.setInterestLifetime(m_resetInterestLifetime);
589 m_face.expressInterest(interest,
590 bind(&Logic::onResetData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800591 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700592 bind(&Logic::onSyncTimeout, this, _1));
593
594 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
595}
596
597void
598Logic::sendSyncInterest()
599{
600 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
601
602 Name interestName;
603 interestName.append(m_syncPrefix)
604 .append(ndn::name::Component(*m_state.getRootDigest()));
605
606 m_outstandingInterestName = interestName;
607
608#ifdef _DEBUG
609 printDigest(m_state.getRootDigest());
610#endif
611
612 EventId eventId =
Sonu Mishra0dadc572016-12-12 23:59:41 -0800613 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600614 ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700615 bind(&Logic::sendSyncInterest, this));
616 m_scheduler.cancelEvent(m_reexpressingInterestId);
617 m_reexpressingInterestId = eventId;
618
619 Interest interest(interestName);
620 interest.setMustBeFresh(true);
621 interest.setInterestLifetime(m_syncInterestLifetime);
622
623 m_outstandingInterestId = m_face.expressInterest(interest,
624 bind(&Logic::onSyncData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800625 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700626 bind(&Logic::onSyncTimeout, this, _1));
627
628 _LOG_DEBUG_ID("Send interest: " << interest.getName());
629 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
630}
631
632void
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600633Logic::trimState(State& partialState, const State& state, size_t maxSize)
634{
635 partialState.reset();
636 State tmp;
637 std::vector<ConstLeafPtr> leaves;
638 for (const ConstLeafPtr& leaf : state.getLeaves()) {
639 leaves.push_back(leaf);
640 }
641
642 std::shuffle(leaves.begin(), leaves.end(), m_rng);
643
644 for (const auto& constLeafPtr : leaves) {
645 tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
646 if (tmp.wireEncode().size() >= maxSize) {
647 break;
648 }
649 partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
650 }
651}
652
653void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800654Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700655{
656 _LOG_DEBUG_ID(">> Logic::sendSyncData");
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600657 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
658 return;
659
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500660 Data syncReply(name);
661 syncReply.setContent(state.wireEncode());
662 syncReply.setFreshnessPeriod(m_syncReplyFreshness);
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600663
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800664 if (m_nodeList[nodePrefix].signingId.empty())
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500665 m_keyChain.sign(syncReply);
Yingdi Yucd339022014-11-05 17:51:19 -0800666 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500667 m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700668
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600669 if (syncReply.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
670 _LOG_DEBUG("Sync reply size exceeded MAX_NDN_PACKET_SIZE");
671 auto maxContentSize = ndn::MAX_NDN_PACKET_SIZE - (syncReply.wireEncode().size() - state.wireEncode().size());
672 State partialState;
673 trimState(partialState, state, maxContentSize);
674 syncReply.setContent(partialState.wireEncode());
675
676 if (m_nodeList[nodePrefix].signingId.empty())
677 m_keyChain.sign(syncReply);
678 else
679 m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
680 }
681
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500682 m_face.put(syncReply);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700683
684 // checking if our own interest got satisfied
685 if (m_outstandingInterestName == name) {
686 // remove outstanding interest
687 if (m_outstandingInterestId != 0) {
688 m_face.removePendingInterest(m_outstandingInterestId);
689 m_outstandingInterestId = 0;
690 }
691
692 // re-schedule sending Sync interest
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600693 time::milliseconds after(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700694 _LOG_DEBUG_ID("Satisfy our own interest");
695 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
696 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
697 m_scheduler.cancelEvent(m_reexpressingInterestId);
698 m_reexpressingInterestId = eventId;
699 }
700 _LOG_DEBUG_ID("<< Logic::sendSyncData");
701}
702
703void
704Logic::cancelReset()
705{
706 _LOG_DEBUG_ID(">> Logic::cancelReset");
707 if (!m_isInReset)
708 return;
709
710 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800711 for (const auto& node : m_nodeList) {
712 updateSeqNo(node.second.seqNo, node.first);
713 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700714 _LOG_DEBUG_ID("<< Logic::cancelReset");
715}
716
717void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500718Logic::printDigest(ConstBufferPtr digest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700719{
Davide Pesavento5473abe2017-10-09 01:35:33 -0400720 std::string hash = ndn::toHex(digest->data(), digest->size(), false);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700721 _LOG_DEBUG_ID("Hash: " << hash);
722}
723
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800724void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500725Logic::sendRecoveryInterest(ConstBufferPtr digest)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800726{
727 _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
728
729 Name interestName;
730 interestName.append(m_syncPrefix)
731 .append(RECOVERY_COMPONENT)
732 .append(ndn::name::Component(*digest));
733
734 Interest interest(interestName);
735 interest.setMustBeFresh(true);
736 interest.setInterestLifetime(m_recoveryInterestLifetime);
737
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800738 m_face.expressInterest(interest,
739 bind(&Logic::onRecoveryData, this, _1, _2),
740 bind(&Logic::onRecoveryTimeout, this, _1), // Nack
741 bind(&Logic::onRecoveryTimeout, this, _1));
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800742
743 _LOG_DEBUG_ID("interest: " << interest.getName());
744 _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
745}
746
747void
748Logic::processRecoveryInterest(const Interest& interest)
749{
750 _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
751
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500752 Name name = interest.getName();
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800753 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
754
755 ConstBufferPtr rootDigest = m_state.getRootDigest();
756
757 DiffStateContainer::iterator stateIter = m_log.find(digest);
758
759 if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
760 _LOG_DEBUG_ID("I can help you recover");
761 sendSyncData(m_defaultUserPrefix, name, m_state);
762 return;
763 }
764 _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
765}
766
767void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800768Logic::onRecoveryData(const Interest& interest, const Data& data)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800769{
770 _LOG_DEBUG_ID(">> Logic::onRecoveryData");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500771 onSyncDataValidated(data);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800772 _LOG_DEBUG_ID("<< Logic::onRecoveryData");
773}
774
775void
776Logic::onRecoveryTimeout(const Interest& interest)
777{
778 _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
779 _LOG_DEBUG_ID("Interest: " << interest.getName());
780 _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
781}
782
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800783void
784Logic::sendExcludeInterest(const Interest& interest, const Data& data)
785{
786 _LOG_DEBUG_ID(">> Logic::sendExcludeInterest");
787
788 Name interestName = interest.getName();
789 Interest excludeInterest(interestName);
790
791 Exclude exclude = interest.getExclude();
792 exclude.excludeOne(data.getFullName().get(-1));
793 excludeInterest.setExclude(exclude);
Ashlesh Gawande04e8d492018-02-04 13:08:15 -0600794 excludeInterest.setMustBeFresh(true);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800795
796 excludeInterest.setInterestLifetime(m_syncInterestLifetime);
797
Ashlesh Gawanded31d6b12017-03-31 11:43:22 -0500798 if (excludeInterest.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
799 return;
800 }
801
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800802 m_face.expressInterest(excludeInterest,
803 bind(&Logic::onSyncData, this, _1, _2),
804 bind(&Logic::onSyncTimeout, this, _1), // Nack
805 bind(&Logic::onSyncTimeout, this, _1));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800806
807 _LOG_DEBUG_ID("Send interest: " << excludeInterest.getName());
808 _LOG_DEBUG_ID("<< Logic::sendExcludeInterest");
809}
810
811void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500812Logic::formAndSendExcludeInterest(const Name& nodePrefix, const State& commit, ConstBufferPtr previousRoot)
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800813{
814 _LOG_DEBUG_ID(">> Logic::formAndSendExcludeInterest");
815 Name interestName;
816 interestName.append(m_syncPrefix)
817 .append(ndn::name::Component(*previousRoot));
818 Interest interest(interestName);
819
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500820 Data data(interestName);
821 data.setContent(commit.wireEncode());
822 data.setFreshnessPeriod(m_syncReplyFreshness);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800823 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
824 return;
825 if (m_nodeList[nodePrefix].signingId.empty())
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500826 m_keyChain.sign(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800827 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500828 m_keyChain.sign(data, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800829
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500830 sendExcludeInterest(interest, data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800831
832 _LOG_DEBUG_ID("<< Logic::formAndSendExcludeInterest");
833}
834
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800835} // namespace chronosync