blob: c83132ce3173bc11a56612265f4ecddc8ca158fe [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Ashlesh Gawande04e8d492018-02-04 13:08:15 -06003 * Copyright (c) 2012-2018 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
28
Ashlesh Gawande687cf922017-05-30 15:04:16 -050029#include <ndn-cxx/util/string-helper.hpp>
30
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080031INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070032
33#ifdef _DEBUG
34#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
35#else
36#define _LOG_DEBUG_ID(v) _LOG_DEBUG(v)
37#endif
38
39namespace chronosync {
40
Yingdi Yuf7ede412014-08-30 20:37:52 -070041using ndn::EventId;
42
43const uint8_t EMPTY_DIGEST_VALUE[] = {
44 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
45 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
46 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
47 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
48};
49
50#ifdef _DEBUG
Ashlesh Gawande08784d42017-09-06 23:40:21 -050051int Logic::s_instanceCounter = 0;
Yingdi Yuf7ede412014-08-30 20:37:52 -070052#endif
53
Yingdi Yucd339022014-11-05 17:51:19 -080054const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080055const ndn::Name Logic::EMPTY_NAME;
Ashlesh Gawande08784d42017-09-06 23:40:21 -050056const std::shared_ptr<Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070057const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
58const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
59const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
60const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
61const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080062const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
Yingdi Yuf7ede412014-08-30 20:37:52 -070063
Ashlesh Gawande08784d42017-09-06 23:40:21 -050064const ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
Yingdi Yuf7ede412014-08-30 20:37:52 -070065const ndn::name::Component Logic::RESET_COMPONENT("reset");
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080066const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Yingdi Yuf7ede412014-08-30 20:37:52 -070067
68Logic::Logic(ndn::Face& face,
69 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080070 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -070071 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080072 const Name& defaultSigningId,
Ashlesh Gawande08784d42017-09-06 23:40:21 -050073 std::shared_ptr<Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -070074 const time::steady_clock::Duration& resetTimer,
75 const time::steady_clock::Duration& cancelResetTimer,
76 const time::milliseconds& resetInterestLifetime,
77 const time::milliseconds& syncInterestLifetime,
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080078 const time::milliseconds& syncReplyFreshness,
79 const time::milliseconds& recoveryInterestLifetime)
Yingdi Yuf7ede412014-08-30 20:37:52 -070080 : m_face(face)
81 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080082 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -070083 , m_interestTable(m_face.getIoService())
84 , m_outstandingInterestId(0)
85 , m_isInReset(false)
86 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
87 , m_onUpdate(onUpdate)
88 , m_scheduler(m_face.getIoService())
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -060089 , m_rng(std::random_device{}())
90 , m_rangeUniformRandom(100, 500)
91 , m_reexpressionJitter(100, 500)
Yingdi Yuf7ede412014-08-30 20:37:52 -070092 , m_resetTimer(resetTimer)
93 , m_cancelResetTimer(cancelResetTimer)
94 , m_resetInterestLifetime(resetInterestLifetime)
95 , m_syncInterestLifetime(syncInterestLifetime)
96 , m_syncReplyFreshness(syncReplyFreshness)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080097 , m_recoveryInterestLifetime(recoveryInterestLifetime)
Yingdi Yucd339022014-11-05 17:51:19 -080098 , m_validator(validator)
Yingdi Yuf7ede412014-08-30 20:37:52 -070099{
100#ifdef _DEBUG
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500101 m_instanceId = s_instanceCounter++;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700102#endif
103
104 _LOG_DEBUG_ID(">> Logic::Logic");
105
Ashlesh Gawande687cf922017-05-30 15:04:16 -0500106 addUserNode(m_defaultUserPrefix, defaultSigningId);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800107
108
Yingdi Yuf7ede412014-08-30 20:37:52 -0700109 m_syncReset = m_syncPrefix;
110 m_syncReset.append("reset");
111
112 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
113 m_syncRegisteredPrefixId =
Junxiao Shi4e010bc2017-11-16 14:11:00 +0000114 m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700115 bind(&Logic::onSyncInterest, this, _1, _2),
116 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
117
Qiuhan Dinge246b622014-12-03 21:57:48 -0800118 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700119 _LOG_DEBUG_ID("<< Logic::Logic");
120}
121
122Logic::~Logic()
123{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700124 m_scheduler.cancelAllEvents();
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800125 m_interestTable.clear();
126 m_face.shutdown();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700127}
128
129void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800130Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700131{
132 m_isInReset = true;
133
134 m_state.reset();
135 m_log.clear();
136
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800137 if (!isOnInterest)
138 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700139
140 // reset outstanding interest name, so that data for previous interest will be dropped.
141 if (m_outstandingInterestId != 0) {
142 m_face.removePendingInterest(m_outstandingInterestId);
143 m_outstandingInterestId = 0;
144 }
145
146 sendSyncInterest();
147
148 if (static_cast<bool>(m_delayedInterestProcessingId))
149 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
150
151 m_delayedInterestProcessingId =
152 m_scheduler.scheduleEvent(m_cancelResetTimer,
153 bind(&Logic::cancelReset, this));
154}
155
156void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800157Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700158{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800159 if (defaultUserPrefix != EMPTY_NAME) {
160 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
161 m_defaultUserPrefix = defaultUserPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800162 }
163 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700164}
165
166void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800167Logic::addUserNode(const Name& userPrefix, const Name& signingId)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700168{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800169 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700170 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800171 if (m_defaultUserPrefix == EMPTY_NAME) {
172 m_defaultUserPrefix = userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800173 }
174 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
175 m_nodeList[userPrefix].userPrefix = userPrefix;
176 m_nodeList[userPrefix].signingId = signingId;
177 Name sessionName = userPrefix;
178 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
179 m_nodeList[userPrefix].sessionName = sessionName;
180 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800181 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800182 }
183}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700184
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800185void
186Logic::removeUserNode(const Name& userPrefix)
187{
188 auto userNode = m_nodeList.find(userPrefix);
189 if (userNode != m_nodeList.end()) {
190 m_nodeList.erase(userNode);
191 if (m_defaultUserPrefix == userPrefix) {
192 if (!m_nodeList.empty()) {
193 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800194 }
195 else {
196 m_defaultUserPrefix = EMPTY_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800197 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700198 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800199 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800200 }
201}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700202
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800203const Name&
204Logic::getSessionName(Name prefix)
205{
206 if (prefix == EMPTY_NAME)
207 prefix = m_defaultUserPrefix;
208 auto node = m_nodeList.find(prefix);
209 if (node != m_nodeList.end())
210 return node->second.sessionName;
211 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800212 BOOST_THROW_EXCEPTION(Error("Refer to non-existent node:" + prefix.toUri()));
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800213}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700214
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800215const SeqNo&
216Logic::getSeqNo(Name prefix)
217{
218 if (prefix == EMPTY_NAME)
219 prefix = m_defaultUserPrefix;
220 auto node = m_nodeList.find(prefix);
221 if (node != m_nodeList.end())
222 return node->second.seqNo;
223 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800224 BOOST_THROW_EXCEPTION(Logic::Error("Refer to non-existent node:" + prefix.toUri()));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700225
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800226}
227
228void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800229Logic::updateSeqNo(const SeqNo& seqNo, const Name& updatePrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800230{
231 Name prefix;
232 if (updatePrefix == EMPTY_NAME) {
233 if (m_defaultUserPrefix == EMPTY_NAME)
234 return;
235 prefix = m_defaultUserPrefix;
236 }
237 else
238 prefix = updatePrefix;
239
240 auto it = m_nodeList.find(prefix);
241 if (it != m_nodeList.end()) {
242 NodeInfo& node = it->second;
243 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
244 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
245 if (seqNo < node.seqNo || seqNo == 0)
246 return;
247
248 node.seqNo = seqNo;
249 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
250
251 if (!m_isInReset) {
252 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500253 ConstBufferPtr previousRoot = m_state.getRootDigest();
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800254 {
Davide Pesavento5473abe2017-10-09 01:35:33 -0400255 std::string hash = ndn::toHex(previousRoot->data(), previousRoot->size(), false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800256 _LOG_DEBUG_ID("Hash: " << hash);
257 }
258
259 bool isInserted = false;
260 bool isUpdated = false;
261 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500262 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName, node.seqNo);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800263
264 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
265 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
266 if (isInserted || isUpdated) {
267 DiffStatePtr commit = make_shared<DiffState>();
268 commit->update(node.sessionName, node.seqNo);
269 commit->setRootDigest(m_state.getRootDigest());
270 insertToDiffLog(commit, previousRoot);
271
272 satisfyPendingSyncInterests(prefix, commit);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800273 formAndSendExcludeInterest(prefix, *commit, previousRoot);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800274 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700275 }
276 }
277}
278
279ConstBufferPtr
280Logic::getRootDigest() const
281{
282 return m_state.getRootDigest();
283}
284
285void
286Logic::printState(std::ostream& os) const
287{
288 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
289 {
290 os << *leaf << "\n";
291 }
292}
293
294std::set<Name>
295Logic::getSessionNames() const
296{
297 std::set<Name> sessionNames;
298
299 BOOST_FOREACH(ConstLeafPtr leaf, m_state.getLeaves())
300 {
301 sessionNames.insert(leaf->getSessionName());
302 }
303
304 return sessionNames;
305}
306
307void
308Logic::onSyncInterest(const Name& prefix, const Interest& interest)
309{
310 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
311 Name name = interest.getName();
312
313 _LOG_DEBUG_ID("InterestName: " << name);
314
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800315 if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
316 processResetInterest(interest);
317 }
318 else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
319 processRecoveryInterest(interest);
320 }
Ashlesh Gawande8ba7d5a2017-07-24 14:43:12 -0500321 // Do not process exclude interests, they should be answered by CS
322 else if (interest.getExclude().empty()) {
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500323 processSyncInterest(interest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700324 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700325
326 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
327}
328
329void
330Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
331{
332 //Sync prefix registration failed
333 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
334}
335
336void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800337Logic::onSyncData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700338{
339 _LOG_DEBUG_ID(">> Logic::onSyncData");
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800340 // if (static_cast<bool>(m_validator))
341 // m_validator->validate(data,
342 // bind(&Logic::onSyncDataValidated, this, _1),
343 // bind(&Logic::onSyncDataValidationFailed, this, _1));
344 // else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500345 // onSyncDataValidated(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800346
347 if (interest.getExclude().empty()) {
348 _LOG_DEBUG_ID("First data");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500349 onSyncDataValidated(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800350 }
351 else {
352 _LOG_DEBUG_ID("Data obtained using exclude filter");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500353 onSyncDataValidated(data, false);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800354 }
355 sendExcludeInterest(interest, data);
356
Yingdi Yuf7ede412014-08-30 20:37:52 -0700357 _LOG_DEBUG_ID("<< Logic::onSyncData");
358}
359
360void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800361Logic::onResetData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700362{
363 // This should not happened, drop the received data.
364}
365
366void
367Logic::onSyncTimeout(const Interest& interest)
368{
369 // It is OK. Others will handle the time out situation.
370 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
371 _LOG_DEBUG_ID("Interest: " << interest.getName());
372 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
373}
374
375void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500376Logic::onSyncDataValidationFailed(const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700377{
378 // SyncReply cannot be validated.
379}
380
381void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500382Logic::onSyncDataValidated(const Data& data, bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700383{
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500384 Name name = data.getName();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700385 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
386
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500387 processSyncData(name, digest, data.getContent().blockFromValue(), firstData);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700388}
389
390void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500391Logic::processSyncInterest(const Interest& interest, bool isTimedProcessing/*=false*/)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700392{
393 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
394
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500395 Name name = interest.getName();
396 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700397
398 ConstBufferPtr rootDigest = m_state.getRootDigest();
399
400 // If the digest of the incoming interest is the same as root digest
401 // Put the interest into InterestTable
402 if (*rootDigest == *digest) {
403 _LOG_DEBUG_ID("Oh, we are in the same state");
404 m_interestTable.insert(interest, digest, false);
405
406 if (!m_isInReset)
407 return;
408
409 if (!isTimedProcessing) {
410 _LOG_DEBUG_ID("Non timed processing in reset");
411 // Still in reset, our own seq has not been put into state yet
412 // Do not hurry, some others may be also resetting and may send their reply
413 if (static_cast<bool>(m_delayedInterestProcessingId))
414 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
415
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600416 time::milliseconds after(m_rangeUniformRandom(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700417 _LOG_DEBUG_ID("After: " << after);
418 m_delayedInterestProcessingId =
419 m_scheduler.scheduleEvent(after,
420 bind(&Logic::processSyncInterest, this, interest, true));
421 }
422 else {
423 _LOG_DEBUG_ID("Timed processing in reset");
424 // Now we can get out of reset state by putting our own stuff into m_state.
425 cancelReset();
426 }
427
428 return;
429 }
430
431 // If the digest of incoming interest is an "empty" digest
Sonu Mishrae10acbc2017-01-18 14:14:05 -0800432 if (*digest == *EMPTY_DIGEST) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700433 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800434 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700435 return;
436 }
437
438 DiffStateContainer::iterator stateIter = m_log.find(digest);
439 // If the digest of incoming interest can be found from the log
440 if (stateIter != m_log.end()) {
441 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800442 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700443 return;
444 }
445
446 if (!isTimedProcessing) {
447 _LOG_DEBUG_ID("Let's wait, just wait for a while");
448 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800449 bool doesExist = m_interestTable.has(digest);
450 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700451 if (doesExist)
452 // Original comment (not sure): somebody else replied, so restart random-game timer
453 // YY: Get the same SyncInterest again, refresh the timer
454 m_scheduler.cancelEvent(m_delayedInterestProcessingId);
455
456 m_delayedInterestProcessingId =
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600457 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700458 bind(&Logic::processSyncInterest, this, interest, true));
459 }
460 else {
461 // OK, nobody is helping us, just tell the truth.
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800462 _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700463 m_interestTable.erase(digest);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800464 sendRecoveryInterest(digest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700465 }
466
467 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
468}
469
470void
471Logic::processResetInterest(const Interest& interest)
472{
473 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800474 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700475}
476
477void
478Logic::processSyncData(const Name& name,
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500479 ConstBufferPtr digest,
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800480 const Block& syncReplyBlock,
481 bool firstData)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700482{
483 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700484 DiffStatePtr commit = make_shared<DiffState>();
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500485 ConstBufferPtr previousRoot = m_state.getRootDigest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700486
487 try {
488 m_interestTable.erase(digest); // Remove satisfied interest from PIT
489
490 State reply;
491 reply.wireDecode(syncReplyBlock);
492
493 std::vector<MissingDataInfo> v;
494 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
495 {
496 BOOST_ASSERT(leaf != 0);
497
498 const Name& info = leaf->getSessionName();
499 SeqNo seq = leaf->getSeq();
500
501 bool isInserted = false;
502 bool isUpdated = false;
503 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500504 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700505 if (isInserted || isUpdated) {
506 commit->update(info, seq);
507
508 oldSeq++;
509 MissingDataInfo mdi = {info, oldSeq, seq};
510 v.push_back(mdi);
511 }
512 }
513
514 if (!v.empty()) {
515 m_onUpdate(v);
516
517 commit->setRootDigest(m_state.getRootDigest());
518 insertToDiffLog(commit, previousRoot);
519 }
520 else {
521 _LOG_DEBUG_ID("What? nothing new");
522 }
523 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800524 catch (const State::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700525 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
526 // Something really fishy happened during state decoding;
527 commit.reset();
528 return;
529 }
530
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800531 if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700532 // state changed and it is safe to express a new interest
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600533 time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700534 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
535 EventId eventId = m_scheduler.scheduleEvent(after,
536 bind(&Logic::sendSyncInterest, this));
537
538 m_scheduler.cancelEvent(m_reexpressingInterestId);
539 m_reexpressingInterestId = eventId;
540 }
541}
542
543void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800544Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700545{
546 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
547 try {
548 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500549 auto it = m_interestTable.begin();
550 while (it != m_interestTable.end()) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700551 ConstUnsatisfiedInterestPtr request = *it;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500552 ++it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700553 if (request->isUnknown)
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500554 sendSyncData(updatedPrefix, request->interest.getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700555 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500556 sendSyncData(updatedPrefix, request->interest.getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700557 }
558 m_interestTable.clear();
559 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800560 catch (const InterestTable::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700561 // ok. not really an error
562 }
563 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
564}
565
566void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500567Logic::insertToDiffLog(DiffStatePtr commit, ConstBufferPtr previousRoot)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700568{
569 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
570 // Connect to the history
571 if (!m_log.empty())
572 (*m_log.find(previousRoot))->setNext(commit);
573
574 // Insert the commit
575 m_log.erase(commit->getRootDigest());
576 m_log.insert(commit);
577 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
578}
579
580void
581Logic::sendResetInterest()
582{
583 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
584
585 if (m_needPeriodReset) {
586 _LOG_DEBUG_ID("Need Period Reset");
587 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
588
589 EventId eventId =
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600590 m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700591 bind(&Logic::sendResetInterest, this));
592 m_scheduler.cancelEvent(m_resetInterestId);
593 m_resetInterestId = eventId;
594 }
595
596 Interest interest(m_syncReset);
597 interest.setMustBeFresh(true);
598 interest.setInterestLifetime(m_resetInterestLifetime);
599 m_face.expressInterest(interest,
600 bind(&Logic::onResetData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800601 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700602 bind(&Logic::onSyncTimeout, this, _1));
603
604 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
605}
606
607void
608Logic::sendSyncInterest()
609{
610 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
611
612 Name interestName;
613 interestName.append(m_syncPrefix)
614 .append(ndn::name::Component(*m_state.getRootDigest()));
615
616 m_outstandingInterestName = interestName;
617
618#ifdef _DEBUG
619 printDigest(m_state.getRootDigest());
620#endif
621
622 EventId eventId =
Sonu Mishra0dadc572016-12-12 23:59:41 -0800623 m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600624 ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700625 bind(&Logic::sendSyncInterest, this));
626 m_scheduler.cancelEvent(m_reexpressingInterestId);
627 m_reexpressingInterestId = eventId;
628
629 Interest interest(interestName);
630 interest.setMustBeFresh(true);
631 interest.setInterestLifetime(m_syncInterestLifetime);
632
633 m_outstandingInterestId = m_face.expressInterest(interest,
634 bind(&Logic::onSyncData, this, _1, _2),
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800635 bind(&Logic::onSyncTimeout, this, _1), // Nack
Yingdi Yuf7ede412014-08-30 20:37:52 -0700636 bind(&Logic::onSyncTimeout, this, _1));
637
638 _LOG_DEBUG_ID("Send interest: " << interest.getName());
639 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
640}
641
642void
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600643Logic::trimState(State& partialState, const State& state, size_t maxSize)
644{
645 partialState.reset();
646 State tmp;
647 std::vector<ConstLeafPtr> leaves;
648 for (const ConstLeafPtr& leaf : state.getLeaves()) {
649 leaves.push_back(leaf);
650 }
651
652 std::shuffle(leaves.begin(), leaves.end(), m_rng);
653
654 for (const auto& constLeafPtr : leaves) {
655 tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
656 if (tmp.wireEncode().size() >= maxSize) {
657 break;
658 }
659 partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
660 }
661}
662
663void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800664Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700665{
666 _LOG_DEBUG_ID(">> Logic::sendSyncData");
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600667 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
668 return;
669
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500670 Data syncReply(name);
671 syncReply.setContent(state.wireEncode());
672 syncReply.setFreshnessPeriod(m_syncReplyFreshness);
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600673
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800674 if (m_nodeList[nodePrefix].signingId.empty())
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500675 m_keyChain.sign(syncReply);
Yingdi Yucd339022014-11-05 17:51:19 -0800676 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500677 m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700678
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600679 if (syncReply.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
680 _LOG_DEBUG("Sync reply size exceeded MAX_NDN_PACKET_SIZE");
681 auto maxContentSize = ndn::MAX_NDN_PACKET_SIZE - (syncReply.wireEncode().size() - state.wireEncode().size());
682 State partialState;
683 trimState(partialState, state, maxContentSize);
684 syncReply.setContent(partialState.wireEncode());
685
686 if (m_nodeList[nodePrefix].signingId.empty())
687 m_keyChain.sign(syncReply);
688 else
689 m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
690 }
691
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500692 m_face.put(syncReply);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700693
694 // checking if our own interest got satisfied
695 if (m_outstandingInterestName == name) {
696 // remove outstanding interest
697 if (m_outstandingInterestId != 0) {
698 m_face.removePendingInterest(m_outstandingInterestId);
699 m_outstandingInterestId = 0;
700 }
701
702 // re-schedule sending Sync interest
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600703 time::milliseconds after(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700704 _LOG_DEBUG_ID("Satisfy our own interest");
705 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
706 EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
707 m_scheduler.cancelEvent(m_reexpressingInterestId);
708 m_reexpressingInterestId = eventId;
709 }
710 _LOG_DEBUG_ID("<< Logic::sendSyncData");
711}
712
713void
714Logic::cancelReset()
715{
716 _LOG_DEBUG_ID(">> Logic::cancelReset");
717 if (!m_isInReset)
718 return;
719
720 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800721 for (const auto& node : m_nodeList) {
722 updateSeqNo(node.second.seqNo, node.first);
723 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700724 _LOG_DEBUG_ID("<< Logic::cancelReset");
725}
726
727void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500728Logic::printDigest(ConstBufferPtr digest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700729{
Davide Pesavento5473abe2017-10-09 01:35:33 -0400730 std::string hash = ndn::toHex(digest->data(), digest->size(), false);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700731 _LOG_DEBUG_ID("Hash: " << hash);
732}
733
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800734void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500735Logic::sendRecoveryInterest(ConstBufferPtr digest)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800736{
737 _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
738
739 Name interestName;
740 interestName.append(m_syncPrefix)
741 .append(RECOVERY_COMPONENT)
742 .append(ndn::name::Component(*digest));
743
744 Interest interest(interestName);
745 interest.setMustBeFresh(true);
746 interest.setInterestLifetime(m_recoveryInterestLifetime);
747
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800748 m_face.expressInterest(interest,
749 bind(&Logic::onRecoveryData, this, _1, _2),
750 bind(&Logic::onRecoveryTimeout, this, _1), // Nack
751 bind(&Logic::onRecoveryTimeout, this, _1));
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800752
753 _LOG_DEBUG_ID("interest: " << interest.getName());
754 _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
755}
756
757void
758Logic::processRecoveryInterest(const Interest& interest)
759{
760 _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
761
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500762 Name name = interest.getName();
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800763 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
764
765 ConstBufferPtr rootDigest = m_state.getRootDigest();
766
767 DiffStateContainer::iterator stateIter = m_log.find(digest);
768
769 if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
770 _LOG_DEBUG_ID("I can help you recover");
771 sendSyncData(m_defaultUserPrefix, name, m_state);
772 return;
773 }
774 _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
775}
776
777void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800778Logic::onRecoveryData(const Interest& interest, const Data& data)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800779{
780 _LOG_DEBUG_ID(">> Logic::onRecoveryData");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500781 onSyncDataValidated(data);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800782 _LOG_DEBUG_ID("<< Logic::onRecoveryData");
783}
784
785void
786Logic::onRecoveryTimeout(const Interest& interest)
787{
788 _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
789 _LOG_DEBUG_ID("Interest: " << interest.getName());
790 _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
791}
792
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800793void
794Logic::sendExcludeInterest(const Interest& interest, const Data& data)
795{
796 _LOG_DEBUG_ID(">> Logic::sendExcludeInterest");
797
798 Name interestName = interest.getName();
799 Interest excludeInterest(interestName);
800
801 Exclude exclude = interest.getExclude();
802 exclude.excludeOne(data.getFullName().get(-1));
803 excludeInterest.setExclude(exclude);
Ashlesh Gawande04e8d492018-02-04 13:08:15 -0600804 excludeInterest.setMustBeFresh(true);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800805
806 excludeInterest.setInterestLifetime(m_syncInterestLifetime);
807
Ashlesh Gawanded31d6b12017-03-31 11:43:22 -0500808 if (excludeInterest.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
809 return;
810 }
811
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800812 m_face.expressInterest(excludeInterest,
813 bind(&Logic::onSyncData, this, _1, _2),
814 bind(&Logic::onSyncTimeout, this, _1), // Nack
815 bind(&Logic::onSyncTimeout, this, _1));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800816
817 _LOG_DEBUG_ID("Send interest: " << excludeInterest.getName());
818 _LOG_DEBUG_ID("<< Logic::sendExcludeInterest");
819}
820
821void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500822Logic::formAndSendExcludeInterest(const Name& nodePrefix, const State& commit, ConstBufferPtr previousRoot)
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800823{
824 _LOG_DEBUG_ID(">> Logic::formAndSendExcludeInterest");
825 Name interestName;
826 interestName.append(m_syncPrefix)
827 .append(ndn::name::Component(*previousRoot));
828 Interest interest(interestName);
829
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500830 Data data(interestName);
831 data.setContent(commit.wireEncode());
832 data.setFreshnessPeriod(m_syncReplyFreshness);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800833 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
834 return;
835 if (m_nodeList[nodePrefix].signingId.empty())
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500836 m_keyChain.sign(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800837 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500838 m_keyChain.sign(data, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800839
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500840 sendExcludeInterest(interest, data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800841
842 _LOG_DEBUG_ID("<< Logic::formAndSendExcludeInterest");
843}
844
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800845} // namespace chronosync