blob: db6f691a6214a5634b34b26e0fd7a988b760067b [file] [log] [blame]
Yingdi Yuf7ede412014-08-30 20:37:52 -07001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
Ashlesh Gawande9a306fe2019-01-04 11:38:18 -06003 * Copyright (c) 2012-2019 University of California, Los Angeles
Yingdi Yuf7ede412014-08-30 20:37:52 -07004 *
5 * This file is part of ChronoSync, synchronization library for distributed realtime
6 * applications for NDN.
7 *
8 * ChronoSync is free software: you can redistribute it and/or modify it under the terms
9 * of the GNU General Public License as published by the Free Software Foundation, either
10 * version 3 of the License, or (at your option) any later version.
11 *
12 * ChronoSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14 * PURPOSE. See the GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along with
17 * ChronoSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18 *
19 * @author Zhenkai Zhu <http://irl.cs.ucla.edu/~zhenkai/>
20 * @author Chaoyi Bian <bcy@pku.edu.cn>
21 * @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
22 * @author Yingdi Yu <yingdi@cs.ucla.edu>
Sonu Mishra0dadc572016-12-12 23:59:41 -080023 * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
Yingdi Yuf7ede412014-08-30 20:37:52 -070024 */
25
26#include "logic.hpp"
27#include "logger.hpp"
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -050028#include "bzip2-helper.hpp"
Yingdi Yuf7ede412014-08-30 20:37:52 -070029
Alexander Afanasyev89036292018-02-13 17:19:50 -050030#include <ndn-cxx/util/backports.hpp>
Ashlesh Gawande687cf922017-05-30 15:04:16 -050031#include <ndn-cxx/util/string-helper.hpp>
32
Alexander Afanasyev36eb3ed2017-01-11 12:35:58 -080033INIT_LOGGER(Logic);
Yingdi Yuf7ede412014-08-30 20:37:52 -070034
Yingdi Yuf7ede412014-08-30 20:37:52 -070035#define _LOG_DEBUG_ID(v) _LOG_DEBUG("Instance" << m_instanceId << ": " << v)
Yingdi Yuf7ede412014-08-30 20:37:52 -070036
37namespace chronosync {
38
Yingdi Yuf7ede412014-08-30 20:37:52 -070039using ndn::EventId;
40
41const uint8_t EMPTY_DIGEST_VALUE[] = {
42 0xe3, 0xb0, 0xc4, 0x42, 0x98, 0xfc, 0x1c, 0x14,
43 0x9a, 0xfb, 0xf4, 0xc8, 0x99, 0x6f, 0xb9, 0x24,
44 0x27, 0xae, 0x41, 0xe4, 0x64, 0x9b, 0x93, 0x4c,
45 0xa4, 0x95, 0x99, 0x1b, 0x78, 0x52, 0xb8, 0x55
46};
47
Ashlesh Gawande08784d42017-09-06 23:40:21 -050048int Logic::s_instanceCounter = 0;
Yingdi Yuf7ede412014-08-30 20:37:52 -070049
Yingdi Yucd339022014-11-05 17:51:19 -080050const ndn::Name Logic::DEFAULT_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -080051const ndn::Name Logic::EMPTY_NAME;
Ashlesh Gawande08784d42017-09-06 23:40:21 -050052const std::shared_ptr<Validator> Logic::DEFAULT_VALIDATOR;
Yingdi Yuf7ede412014-08-30 20:37:52 -070053const time::steady_clock::Duration Logic::DEFAULT_RESET_TIMER = time::seconds(0);
54const time::steady_clock::Duration Logic::DEFAULT_CANCEL_RESET_TIMER = time::milliseconds(500);
55const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
56const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
57const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080058const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
Yingdi Yuf7ede412014-08-30 20:37:52 -070059
Ashlesh Gawande08784d42017-09-06 23:40:21 -050060const ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
Yingdi Yuf7ede412014-08-30 20:37:52 -070061const ndn::name::Component Logic::RESET_COMPONENT("reset");
Sonu Mishra4d3a2e02017-01-18 20:27:51 -080062const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Yingdi Yuf7ede412014-08-30 20:37:52 -070063
Alexander Afanasyev89036292018-02-13 17:19:50 -050064const size_t NDNLP_EXPECTED_OVERHEAD = 20;
65
66/**
67 * Get maximum packet limit
68 *
69 * By default, it returns `ndn::MAX_NDN_PACKET_SIZE`.
70 * The returned value can be customized using the environment variable `CHRONOSYNC_MAX_PACKET_SIZE`,
71 * but the returned value will be at least 500 and no more than `ndn::MAX_NDN_PACKET_SIZE`.
72 */
73#ifndef CHRONOSYNC_HAVE_TESTS
74static
75#endif // CHRONOSYNC_HAVE_TESTS
76size_t
77getMaxPacketLimit()
78{
79 static size_t limit = 0;
80#ifndef CHRONOSYNC_HAVE_TESTS
81 if (limit != 0) {
82 return limit;
83 }
84#endif // CHRONOSYNC_HAVE_TESTS
85
86 if (getenv("CHRONOSYNC_MAX_PACKET_SIZE") != nullptr) {
87 try {
88 limit = ndn::clamp<size_t>(boost::lexical_cast<size_t>(getenv("CHRONOSYNC_MAX_PACKET_SIZE")),
89 500, ndn::MAX_NDN_PACKET_SIZE);
90 }
91 catch (const boost::bad_lexical_cast&) {
92 limit = ndn::MAX_NDN_PACKET_SIZE;
93 }
94 }
95 else {
96 limit = ndn::MAX_NDN_PACKET_SIZE;
97 }
98
99 return limit;
100}
101
Yingdi Yuf7ede412014-08-30 20:37:52 -0700102Logic::Logic(ndn::Face& face,
103 const Name& syncPrefix,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800104 const Name& defaultUserPrefix,
Yingdi Yuf7ede412014-08-30 20:37:52 -0700105 const UpdateCallback& onUpdate,
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800106 const Name& defaultSigningId,
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500107 std::shared_ptr<Validator> validator,
Yingdi Yuf7ede412014-08-30 20:37:52 -0700108 const time::steady_clock::Duration& resetTimer,
109 const time::steady_clock::Duration& cancelResetTimer,
110 const time::milliseconds& resetInterestLifetime,
111 const time::milliseconds& syncInterestLifetime,
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800112 const time::milliseconds& syncReplyFreshness,
Alexander Afanasyevbf5bc6c2018-02-19 11:26:09 -0500113 const time::milliseconds& recoveryInterestLifetime,
114 const name::Component& session)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700115 : m_face(face)
116 , m_syncPrefix(syncPrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800117 , m_defaultUserPrefix(defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700118 , m_interestTable(m_face.getIoService())
Yingdi Yuf7ede412014-08-30 20:37:52 -0700119 , m_isInReset(false)
120 , m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
121 , m_onUpdate(onUpdate)
122 , m_scheduler(m_face.getIoService())
Ashlesh Gawande9a306fe2019-01-04 11:38:18 -0600123 , m_rng(ndn::random::getRandomNumberEngine())
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600124 , m_rangeUniformRandom(100, 500)
125 , m_reexpressionJitter(100, 500)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700126 , m_resetTimer(resetTimer)
127 , m_cancelResetTimer(cancelResetTimer)
128 , m_resetInterestLifetime(resetInterestLifetime)
129 , m_syncInterestLifetime(syncInterestLifetime)
130 , m_syncReplyFreshness(syncReplyFreshness)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800131 , m_recoveryInterestLifetime(recoveryInterestLifetime)
Yingdi Yucd339022014-11-05 17:51:19 -0800132 , m_validator(validator)
Alexander Afanasyev90587b82018-02-11 20:36:53 -0500133 , m_instanceId(s_instanceCounter++)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700134{
Yingdi Yuf7ede412014-08-30 20:37:52 -0700135 _LOG_DEBUG_ID(">> Logic::Logic");
136
Alexander Afanasyevbf5bc6c2018-02-19 11:26:09 -0500137 addUserNode(m_defaultUserPrefix, defaultSigningId, session);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800138
Yingdi Yuf7ede412014-08-30 20:37:52 -0700139 m_syncReset = m_syncPrefix;
140 m_syncReset.append("reset");
141
142 _LOG_DEBUG_ID("Listen to: " << m_syncPrefix);
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700143 m_syncRegisteredPrefix = m_face.setInterestFilter(
144 ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
145 bind(&Logic::onSyncInterest, this, _1, _2),
146 bind(&Logic::onSyncRegisterFailed, this, _1, _2));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700147
Qiuhan Dinge246b622014-12-03 21:57:48 -0800148 sendSyncInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700149 _LOG_DEBUG_ID("<< Logic::Logic");
150}
151
152Logic::~Logic()
153{
Nick Gordon0b3beab2018-03-02 13:03:28 -0600154 _LOG_DEBUG_ID(">> Logic::~Logic");
Yingdi Yu9d5679a2015-02-01 00:17:58 -0800155 m_interestTable.clear();
Nick Gordon0b3beab2018-03-02 13:03:28 -0600156 m_scheduler.cancelAllEvents();
157 _LOG_DEBUG_ID("<< Logic::~Logic");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700158}
159
160void
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800161Logic::reset(bool isOnInterest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700162{
163 m_isInReset = true;
164
165 m_state.reset();
166 m_log.clear();
167
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800168 if (!isOnInterest)
169 sendResetInterest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700170
Yingdi Yuf7ede412014-08-30 20:37:52 -0700171 sendSyncInterest();
172
Junxiao Shic4902122019-02-08 15:13:50 -0700173 m_delayedInterestProcessingId = m_scheduler.scheduleEvent(m_cancelResetTimer,
174 bind(&Logic::cancelReset, this));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700175}
176
177void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800178Logic::setDefaultUserPrefix(const Name& defaultUserPrefix)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700179{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800180 if (defaultUserPrefix != EMPTY_NAME) {
181 if (m_nodeList.find(defaultUserPrefix) != m_nodeList.end()) {
182 m_defaultUserPrefix = defaultUserPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800183 }
184 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700185}
186
187void
Alexander Afanasyevbf5bc6c2018-02-19 11:26:09 -0500188Logic::addUserNode(const Name& userPrefix, const Name& signingId, const name::Component& session)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700189{
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800190 if (userPrefix == EMPTY_NAME)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700191 return;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800192 if (m_defaultUserPrefix == EMPTY_NAME) {
193 m_defaultUserPrefix = userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800194 }
195 if (m_nodeList.find(userPrefix) == m_nodeList.end()) {
196 m_nodeList[userPrefix].userPrefix = userPrefix;
197 m_nodeList[userPrefix].signingId = signingId;
198 Name sessionName = userPrefix;
Alexander Afanasyevbf5bc6c2018-02-19 11:26:09 -0500199 if (!session.empty()) {
200 sessionName.append(session);
201 }
202 else {
203 sessionName.appendNumber(ndn::time::toUnixTimestamp(ndn::time::system_clock::now()).count());
204 }
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800205 m_nodeList[userPrefix].sessionName = sessionName;
206 m_nodeList[userPrefix].seqNo = 0;
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800207 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800208 }
209}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700210
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800211void
212Logic::removeUserNode(const Name& userPrefix)
213{
214 auto userNode = m_nodeList.find(userPrefix);
215 if (userNode != m_nodeList.end()) {
216 m_nodeList.erase(userNode);
217 if (m_defaultUserPrefix == userPrefix) {
218 if (!m_nodeList.empty()) {
219 m_defaultUserPrefix = m_nodeList.begin()->second.userPrefix;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800220 }
221 else {
222 m_defaultUserPrefix = EMPTY_NAME;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800223 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700224 }
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800225 reset(false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800226 }
227}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700228
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800229const Name&
230Logic::getSessionName(Name prefix)
231{
232 if (prefix == EMPTY_NAME)
233 prefix = m_defaultUserPrefix;
234 auto node = m_nodeList.find(prefix);
235 if (node != m_nodeList.end())
236 return node->second.sessionName;
237 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800238 BOOST_THROW_EXCEPTION(Error("Refer to non-existent node:" + prefix.toUri()));
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800239}
Yingdi Yuf7ede412014-08-30 20:37:52 -0700240
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800241const SeqNo&
242Logic::getSeqNo(Name prefix)
243{
244 if (prefix == EMPTY_NAME)
245 prefix = m_defaultUserPrefix;
246 auto node = m_nodeList.find(prefix);
247 if (node != m_nodeList.end())
248 return node->second.seqNo;
249 else
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800250 BOOST_THROW_EXCEPTION(Logic::Error("Refer to non-existent node:" + prefix.toUri()));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700251
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800252}
253
254void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800255Logic::updateSeqNo(const SeqNo& seqNo, const Name& updatePrefix)
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800256{
257 Name prefix;
258 if (updatePrefix == EMPTY_NAME) {
259 if (m_defaultUserPrefix == EMPTY_NAME)
260 return;
261 prefix = m_defaultUserPrefix;
262 }
263 else
264 prefix = updatePrefix;
265
266 auto it = m_nodeList.find(prefix);
267 if (it != m_nodeList.end()) {
268 NodeInfo& node = it->second;
269 _LOG_DEBUG_ID(">> Logic::updateSeqNo");
270 _LOG_DEBUG_ID("seqNo: " << seqNo << " m_seqNo: " << node.seqNo);
271 if (seqNo < node.seqNo || seqNo == 0)
272 return;
273
274 node.seqNo = seqNo;
275 _LOG_DEBUG_ID("updateSeqNo: m_seqNo " << node.seqNo);
276
277 if (!m_isInReset) {
278 _LOG_DEBUG_ID("updateSeqNo: not in Reset ");
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500279 ConstBufferPtr previousRoot = m_state.getRootDigest();
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800280 {
Davide Pesavento5473abe2017-10-09 01:35:33 -0400281 std::string hash = ndn::toHex(previousRoot->data(), previousRoot->size(), false);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800282 _LOG_DEBUG_ID("Hash: " << hash);
283 }
284
285 bool isInserted = false;
286 bool isUpdated = false;
287 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500288 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(node.sessionName, node.seqNo);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800289
290 _LOG_DEBUG_ID("Insert: " << std::boolalpha << isInserted);
291 _LOG_DEBUG_ID("Updated: " << std::boolalpha << isUpdated);
292 if (isInserted || isUpdated) {
293 DiffStatePtr commit = make_shared<DiffState>();
294 commit->update(node.sessionName, node.seqNo);
295 commit->setRootDigest(m_state.getRootDigest());
296 insertToDiffLog(commit, previousRoot);
297
298 satisfyPendingSyncInterests(prefix, commit);
Alexander Afanasyevfcbf81d2018-02-19 10:25:46 -0500299 // formAndSendExcludeInterest(prefix, *commit, previousRoot);
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800300 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700301 }
302 }
303}
304
305ConstBufferPtr
306Logic::getRootDigest() const
307{
308 return m_state.getRootDigest();
309}
310
311void
312Logic::printState(std::ostream& os) const
313{
Nick Gordon0b3beab2018-03-02 13:03:28 -0600314 for (const auto& leaf : m_state.getLeaves()) {
315 os << *leaf << "\n";
316 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700317}
318
319std::set<Name>
320Logic::getSessionNames() const
321{
322 std::set<Name> sessionNames;
Nick Gordon0b3beab2018-03-02 13:03:28 -0600323 for (const auto& leaf : m_state.getLeaves()) {
324 sessionNames.insert(leaf->getSessionName());
325 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700326 return sessionNames;
327}
328
329void
330Logic::onSyncInterest(const Name& prefix, const Interest& interest)
331{
332 _LOG_DEBUG_ID(">> Logic::onSyncInterest");
333 Name name = interest.getName();
334
335 _LOG_DEBUG_ID("InterestName: " << name);
336
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800337 if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
338 processResetInterest(interest);
339 }
340 else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
341 processRecoveryInterest(interest);
342 }
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500343 else {
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500344 processSyncInterest(interest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700345 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700346
347 _LOG_DEBUG_ID("<< Logic::onSyncInterest");
348}
349
350void
351Logic::onSyncRegisterFailed(const Name& prefix, const std::string& msg)
352{
353 //Sync prefix registration failed
354 _LOG_DEBUG_ID(">> Logic::onSyncRegisterFailed");
355}
356
357void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800358Logic::onSyncData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700359{
360 _LOG_DEBUG_ID(">> Logic::onSyncData");
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500361 if (m_validator != nullptr)
362 m_validator->validate(data,
363 bind(&Logic::onSyncDataValidated, this, _1),
364 bind(&Logic::onSyncDataValidationFailed, this, _1));
365 else
366 onSyncDataValidated(data);
Sonu Mishraf42aa2c2017-01-22 18:47:33 -0800367
Yingdi Yuf7ede412014-08-30 20:37:52 -0700368 _LOG_DEBUG_ID("<< Logic::onSyncData");
369}
370
371void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800372Logic::onResetData(const Interest& interest, const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700373{
374 // This should not happened, drop the received data.
375}
376
377void
378Logic::onSyncTimeout(const Interest& interest)
379{
380 // It is OK. Others will handle the time out situation.
381 _LOG_DEBUG_ID(">> Logic::onSyncTimeout");
382 _LOG_DEBUG_ID("Interest: " << interest.getName());
383 _LOG_DEBUG_ID("<< Logic::onSyncTimeout");
384}
385
386void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500387Logic::onSyncDataValidationFailed(const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700388{
389 // SyncReply cannot be validated.
390}
391
392void
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500393Logic::onSyncDataValidated(const Data& data)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700394{
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500395 Name name = data.getName();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700396 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
397
Alexander Afanasyevfcbf81d2018-02-19 10:25:46 -0500398 try {
399 auto contentBuffer = bzip2::decompress(reinterpret_cast<const char*>(data.getContent().value()),
400 data.getContent().value_size());
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500401 processSyncData(name, digest, Block(std::move(contentBuffer)));
Alexander Afanasyevfcbf81d2018-02-19 10:25:46 -0500402 }
403 catch (const std::ios_base::failure& error) {
404 _LOG_WARN("Error decompressing content of " << data.getName() << " (" << error.what() << ")");
405 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700406}
407
408void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500409Logic::processSyncInterest(const Interest& interest, bool isTimedProcessing/*=false*/)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700410{
411 _LOG_DEBUG_ID(">> Logic::processSyncInterest");
412
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500413 Name name = interest.getName();
414 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700415
416 ConstBufferPtr rootDigest = m_state.getRootDigest();
417
418 // If the digest of the incoming interest is the same as root digest
419 // Put the interest into InterestTable
420 if (*rootDigest == *digest) {
421 _LOG_DEBUG_ID("Oh, we are in the same state");
422 m_interestTable.insert(interest, digest, false);
423
424 if (!m_isInReset)
425 return;
426
427 if (!isTimedProcessing) {
428 _LOG_DEBUG_ID("Non timed processing in reset");
429 // Still in reset, our own seq has not been put into state yet
430 // Do not hurry, some others may be also resetting and may send their reply
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600431 time::milliseconds after(m_rangeUniformRandom(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700432 _LOG_DEBUG_ID("After: " << after);
Junxiao Shic4902122019-02-08 15:13:50 -0700433 m_delayedInterestProcessingId = m_scheduler.scheduleEvent(after,
434 bind(&Logic::processSyncInterest, this, interest, true));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700435 }
436 else {
437 _LOG_DEBUG_ID("Timed processing in reset");
438 // Now we can get out of reset state by putting our own stuff into m_state.
439 cancelReset();
440 }
441
442 return;
443 }
444
445 // If the digest of incoming interest is an "empty" digest
Sonu Mishrae10acbc2017-01-18 14:14:05 -0800446 if (*digest == *EMPTY_DIGEST) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700447 _LOG_DEBUG_ID("Poor guy, he knows nothing");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800448 sendSyncData(m_defaultUserPrefix, name, m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700449 return;
450 }
451
452 DiffStateContainer::iterator stateIter = m_log.find(digest);
453 // If the digest of incoming interest can be found from the log
454 if (stateIter != m_log.end()) {
455 _LOG_DEBUG_ID("It is ok, you are so close");
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800456 sendSyncData(m_defaultUserPrefix, name, *(*stateIter)->diff());
Yingdi Yuf7ede412014-08-30 20:37:52 -0700457 return;
458 }
459
460 if (!isTimedProcessing) {
461 _LOG_DEBUG_ID("Let's wait, just wait for a while");
462 // Do not hurry, some incoming SyncReplies may help us to recognize the digest
Yingdi Yu53f5f042015-01-31 16:33:25 -0800463 m_interestTable.insert(interest, digest, true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700464
465 m_delayedInterestProcessingId =
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600466 m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom(m_rng)),
Yingdi Yuf7ede412014-08-30 20:37:52 -0700467 bind(&Logic::processSyncInterest, this, interest, true));
468 }
469 else {
470 // OK, nobody is helping us, just tell the truth.
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800471 _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700472 m_interestTable.erase(digest);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800473 sendRecoveryInterest(digest);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700474 }
475
476 _LOG_DEBUG_ID("<< Logic::processSyncInterest");
477}
478
479void
480Logic::processResetInterest(const Interest& interest)
481{
482 _LOG_DEBUG_ID(">> Logic::processResetInterest");
Qiuhan Dingfb8c9e02015-01-30 14:04:55 -0800483 reset(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700484}
485
486void
487Logic::processSyncData(const Name& name,
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500488 ConstBufferPtr digest,
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500489 const Block& syncReplyBlock)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700490{
491 _LOG_DEBUG_ID(">> Logic::processSyncData");
Yingdi Yuf7ede412014-08-30 20:37:52 -0700492 DiffStatePtr commit = make_shared<DiffState>();
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500493 ConstBufferPtr previousRoot = m_state.getRootDigest();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700494
495 try {
496 m_interestTable.erase(digest); // Remove satisfied interest from PIT
497
498 State reply;
499 reply.wireDecode(syncReplyBlock);
500
501 std::vector<MissingDataInfo> v;
502 BOOST_FOREACH(ConstLeafPtr leaf, reply.getLeaves().get<ordered>())
503 {
504 BOOST_ASSERT(leaf != 0);
505
506 const Name& info = leaf->getSessionName();
507 SeqNo seq = leaf->getSeq();
508
509 bool isInserted = false;
510 bool isUpdated = false;
511 SeqNo oldSeq;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500512 std::tie(isInserted, isUpdated, oldSeq) = m_state.update(info, seq);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700513 if (isInserted || isUpdated) {
514 commit->update(info, seq);
515
516 oldSeq++;
517 MissingDataInfo mdi = {info, oldSeq, seq};
518 v.push_back(mdi);
519 }
520 }
521
522 if (!v.empty()) {
523 m_onUpdate(v);
524
525 commit->setRootDigest(m_state.getRootDigest());
526 insertToDiffLog(commit, previousRoot);
527 }
528 else {
529 _LOG_DEBUG_ID("What? nothing new");
530 }
531 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800532 catch (const State::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700533 _LOG_DEBUG_ID("Something really fishy happened during state decoding");
534 // Something really fishy happened during state decoding;
535 commit.reset();
536 return;
537 }
538
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500539 if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700540 // state changed and it is safe to express a new interest
Junxiao Shic4902122019-02-08 15:13:50 -0700541 auto after = time::milliseconds(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700542 _LOG_DEBUG_ID("Reschedule sync interest after: " << after);
Junxiao Shic4902122019-02-08 15:13:50 -0700543 m_reexpressingInterestId = m_scheduler.scheduleEvent(after,
544 bind(&Logic::sendSyncInterest, this));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700545 }
546}
547
548void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800549Logic::satisfyPendingSyncInterests(const Name& updatedPrefix, ConstDiffStatePtr commit)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700550{
551 _LOG_DEBUG_ID(">> Logic::satisfyPendingSyncInterests");
552 try {
553 _LOG_DEBUG_ID("InterestTable size: " << m_interestTable.size());
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500554 auto it = m_interestTable.begin();
555 while (it != m_interestTable.end()) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700556 ConstUnsatisfiedInterestPtr request = *it;
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500557 ++it;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700558 if (request->isUnknown)
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500559 sendSyncData(updatedPrefix, request->interest.getName(), m_state);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700560 else
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500561 sendSyncData(updatedPrefix, request->interest.getName(), *commit);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700562 }
563 m_interestTable.clear();
564 }
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800565 catch (const InterestTable::Error&) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700566 // ok. not really an error
567 }
568 _LOG_DEBUG_ID("<< Logic::satisfyPendingSyncInterests");
569}
570
571void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500572Logic::insertToDiffLog(DiffStatePtr commit, ConstBufferPtr previousRoot)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700573{
574 _LOG_DEBUG_ID(">> Logic::insertToDiffLog");
575 // Connect to the history
576 if (!m_log.empty())
577 (*m_log.find(previousRoot))->setNext(commit);
578
579 // Insert the commit
580 m_log.erase(commit->getRootDigest());
581 m_log.insert(commit);
582 _LOG_DEBUG_ID("<< Logic::insertToDiffLog");
583}
584
585void
586Logic::sendResetInterest()
587{
588 _LOG_DEBUG_ID(">> Logic::sendResetInterest");
589
590 if (m_needPeriodReset) {
591 _LOG_DEBUG_ID("Need Period Reset");
592 _LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
593
Junxiao Shic4902122019-02-08 15:13:50 -0700594 m_resetInterestId = m_scheduler.scheduleEvent(
595 m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
596 bind(&Logic::sendResetInterest, this));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700597 }
598
599 Interest interest(m_syncReset);
600 interest.setMustBeFresh(true);
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500601 interest.setCanBePrefix(false); // no data is expected
Yingdi Yuf7ede412014-08-30 20:37:52 -0700602 interest.setInterestLifetime(m_resetInterestLifetime);
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700603
604 // Assigning to m_pendingResetInterest cancels the previous reset Interest.
605 // This is harmless since no Data is expected.
606 m_pendingResetInterest = m_face.expressInterest(interest,
Nick Gordon0b3beab2018-03-02 13:03:28 -0600607 bind(&Logic::onResetData, this, _1, _2),
608 bind(&Logic::onSyncTimeout, this, _1), // Nack
609 bind(&Logic::onSyncTimeout, this, _1));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700610 _LOG_DEBUG_ID("<< Logic::sendResetInterest");
611}
612
613void
614Logic::sendSyncInterest()
615{
616 _LOG_DEBUG_ID(">> Logic::sendSyncInterest");
617
618 Name interestName;
619 interestName.append(m_syncPrefix)
620 .append(ndn::name::Component(*m_state.getRootDigest()));
621
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700622 m_pendingSyncInterestName = interestName;
Yingdi Yuf7ede412014-08-30 20:37:52 -0700623
624#ifdef _DEBUG
625 printDigest(m_state.getRootDigest());
626#endif
627
Junxiao Shic4902122019-02-08 15:13:50 -0700628 m_reexpressingInterestId = m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
629 ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
630 bind(&Logic::sendSyncInterest, this));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700631
632 Interest interest(interestName);
633 interest.setMustBeFresh(true);
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500634 interest.setCanBePrefix(true);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700635 interest.setInterestLifetime(m_syncInterestLifetime);
636
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700637 m_pendingSyncInterest = m_face.expressInterest(interest,
638 bind(&Logic::onSyncData, this, _1, _2),
639 bind(&Logic::onSyncTimeout, this, _1), // Nack
640 bind(&Logic::onSyncTimeout, this, _1));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700641
642 _LOG_DEBUG_ID("Send interest: " << interest.getName());
643 _LOG_DEBUG_ID("<< Logic::sendSyncInterest");
644}
645
646void
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500647Logic::trimState(State& partialState, const State& state, size_t nExcludedStates)
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600648{
649 partialState.reset();
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500650
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600651 std::vector<ConstLeafPtr> leaves;
652 for (const ConstLeafPtr& leaf : state.getLeaves()) {
653 leaves.push_back(leaf);
654 }
655
656 std::shuffle(leaves.begin(), leaves.end(), m_rng);
657
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500658 size_t statesToEncode = leaves.size() - std::min(leaves.size() - 1, nExcludedStates);
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600659 for (const auto& constLeafPtr : leaves) {
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500660 if (statesToEncode == 0) {
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600661 break;
662 }
663 partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500664 --statesToEncode;
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600665 }
666}
667
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500668Data
669Logic::encodeSyncReply(const Name& nodePrefix, const Name& name, const State& state)
670{
671 Data syncReply(name);
672 syncReply.setFreshnessPeriod(m_syncReplyFreshness);
673
674 auto finalizeReply = [this, &nodePrefix, &syncReply] (const State& state) {
675 auto contentBuffer = bzip2::compress(reinterpret_cast<const char*>(state.wireEncode().wire()),
676 state.wireEncode().size());
677 syncReply.setContent(contentBuffer);
678
679 if (m_nodeList[nodePrefix].signingId.empty())
680 m_keyChain.sign(syncReply);
681 else
682 m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
683 };
684
685 finalizeReply(state);
686
687 size_t nExcludedStates = 1;
688 while (syncReply.wireEncode().size() > getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) {
689 if (nExcludedStates == 1) {
690 // To show this debug message only once
691 _LOG_DEBUG("Sync reply size exceeded maximum packet limit (" << (getMaxPacketLimit() - NDNLP_EXPECTED_OVERHEAD) << ")");
692 }
693 State partialState;
694 trimState(partialState, state, nExcludedStates);
695 finalizeReply(partialState);
696
697 BOOST_ASSERT(state.getLeaves().size() != 0);
698 nExcludedStates *= 2;
699 }
700
701 return syncReply;
702}
703
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600704void
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800705Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700706{
707 _LOG_DEBUG_ID(">> Logic::sendSyncData");
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600708 if (m_nodeList.find(nodePrefix) == m_nodeList.end())
709 return;
710
Alexander Afanasyev6ee98ff2018-02-13 19:12:28 -0500711 m_face.put(encodeSyncReply(nodePrefix, name, state));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700712
713 // checking if our own interest got satisfied
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700714 if (m_pendingSyncInterestName == name) {
Yingdi Yuf7ede412014-08-30 20:37:52 -0700715 // remove outstanding interest
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700716 m_pendingSyncInterest.cancel();
Yingdi Yuf7ede412014-08-30 20:37:52 -0700717
718 // re-schedule sending Sync interest
Ashlesh Gawande4a9ecd52018-02-06 14:36:19 -0600719 time::milliseconds after(m_reexpressionJitter(m_rng));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700720 _LOG_DEBUG_ID("Satisfy our own interest");
721 _LOG_DEBUG_ID("Reschedule sync interest after " << after);
Junxiao Shic4902122019-02-08 15:13:50 -0700722 m_reexpressingInterestId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
Yingdi Yuf7ede412014-08-30 20:37:52 -0700723 }
724 _LOG_DEBUG_ID("<< Logic::sendSyncData");
725}
726
727void
728Logic::cancelReset()
729{
730 _LOG_DEBUG_ID(">> Logic::cancelReset");
731 if (!m_isInReset)
732 return;
733
734 m_isInReset = false;
Qiuhan Ding8c095fd2014-11-19 17:38:32 -0800735 for (const auto& node : m_nodeList) {
736 updateSeqNo(node.second.seqNo, node.first);
737 }
Yingdi Yuf7ede412014-08-30 20:37:52 -0700738 _LOG_DEBUG_ID("<< Logic::cancelReset");
739}
740
741void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500742Logic::printDigest(ConstBufferPtr digest)
Yingdi Yuf7ede412014-08-30 20:37:52 -0700743{
Davide Pesavento5473abe2017-10-09 01:35:33 -0400744 std::string hash = ndn::toHex(digest->data(), digest->size(), false);
Yingdi Yuf7ede412014-08-30 20:37:52 -0700745 _LOG_DEBUG_ID("Hash: " << hash);
746}
747
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800748void
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500749Logic::sendRecoveryInterest(ConstBufferPtr digest)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800750{
751 _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
752
753 Name interestName;
754 interestName.append(m_syncPrefix)
755 .append(RECOVERY_COMPONENT)
756 .append(ndn::name::Component(*digest));
757
758 Interest interest(interestName);
759 interest.setMustBeFresh(true);
Ashlesh Gawande1d1092d2018-08-03 14:36:49 -0500760 interest.setCanBePrefix(true);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800761 interest.setInterestLifetime(m_recoveryInterestLifetime);
762
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700763 m_pendingRecoveryInterests[interestName[-1].toUri()] = m_face.expressInterest(interest,
Nick Gordon0b3beab2018-03-02 13:03:28 -0600764 bind(&Logic::onRecoveryData, this, _1, _2),
765 bind(&Logic::onRecoveryTimeout, this, _1), // Nack
766 bind(&Logic::onRecoveryTimeout, this, _1));
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800767 _LOG_DEBUG_ID("interest: " << interest.getName());
768 _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
769}
770
771void
772Logic::processRecoveryInterest(const Interest& interest)
773{
774 _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
775
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500776 Name name = interest.getName();
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800777 ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
778
779 ConstBufferPtr rootDigest = m_state.getRootDigest();
780
781 DiffStateContainer::iterator stateIter = m_log.find(digest);
782
783 if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
784 _LOG_DEBUG_ID("I can help you recover");
785 sendSyncData(m_defaultUserPrefix, name, m_state);
786 return;
787 }
788 _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
789}
790
791void
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800792Logic::onRecoveryData(const Interest& interest, const Data& data)
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800793{
794 _LOG_DEBUG_ID(">> Logic::onRecoveryData");
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700795 m_pendingRecoveryInterests.erase(interest.getName()[-1].toUri());
Ashlesh Gawande08784d42017-09-06 23:40:21 -0500796 onSyncDataValidated(data);
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800797 _LOG_DEBUG_ID("<< Logic::onRecoveryData");
798}
799
800void
801Logic::onRecoveryTimeout(const Interest& interest)
802{
803 _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
Junxiao Shi8e4e76d2019-02-08 15:25:08 -0700804 m_pendingRecoveryInterests.erase(interest.getName()[-1].toUri());
Sonu Mishra4d3a2e02017-01-18 20:27:51 -0800805 _LOG_DEBUG_ID("Interest: " << interest.getName());
806 _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
807}
808
Alexander Afanasyeve9eda8a2017-03-09 14:40:03 -0800809} // namespace chronosync