logic: Implement Recovery mechanism
This commit implements the recovery mechanism that is critical for
handling network partitions. When a node receives a sync interest with
unrecognizable digest, it goes into recovery.
Change-Id: I205687b9791b286cf6eca4c0159b49f744b38bed
Refs: #3929
diff --git a/AUTHORS.md b/AUTHORS.md
index 58fac9b..88f6905 100644
--- a/AUTHORS.md
+++ b/AUTHORS.md
@@ -5,3 +5,4 @@
- Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
- Chaoyi Bian <bcy@pku.edu.cn>
- Yingdi Yu <http://irl.cs.ucla.edu/~yingdi/web/index.html>
+- Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
diff --git a/src/logic.cpp b/src/logic.cpp
index a110328..aa08acc 100644
--- a/src/logic.cpp
+++ b/src/logic.cpp
@@ -58,9 +58,11 @@
const time::milliseconds Logic::DEFAULT_RESET_INTEREST_LIFETIME(1000);
const time::milliseconds Logic::DEFAULT_SYNC_INTEREST_LIFETIME(1000);
const time::milliseconds Logic::DEFAULT_SYNC_REPLY_FRESHNESS(1000);
+const time::milliseconds Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME(1000);
const ndn::ConstBufferPtr Logic::EMPTY_DIGEST(new ndn::Buffer(EMPTY_DIGEST_VALUE, 32));
const ndn::name::Component Logic::RESET_COMPONENT("reset");
+const ndn::name::Component Logic::RECOVERY_COMPONENT("recovery");
Logic::Logic(ndn::Face& face,
const Name& syncPrefix,
@@ -72,7 +74,8 @@
const time::steady_clock::Duration& cancelResetTimer,
const time::milliseconds& resetInterestLifetime,
const time::milliseconds& syncInterestLifetime,
- const time::milliseconds& syncReplyFreshness)
+ const time::milliseconds& syncReplyFreshness,
+ const time::milliseconds& recoveryInterestLifetime)
: m_face(face)
, m_syncPrefix(syncPrefix)
, m_defaultUserPrefix(defaultUserPrefix)
@@ -90,6 +93,7 @@
, m_resetInterestLifetime(resetInterestLifetime)
, m_syncInterestLifetime(syncInterestLifetime)
, m_syncReplyFreshness(syncReplyFreshness)
+ , m_recoveryInterestLifetime(recoveryInterestLifetime)
, m_defaultSigningId(defaultSigningId)
, m_validator(validator)
{
@@ -316,13 +320,15 @@
_LOG_DEBUG_ID("InterestName: " << name);
- if (RESET_COMPONENT != name.get(-1)) {
- // normal sync interest
+ if (name.size() >= 1 && RESET_COMPONENT == name.get(-1)) {
+ processResetInterest(interest);
+ }
+ else if (name.size() >= 2 && RECOVERY_COMPONENT == name.get(-2)) {
+ processRecoveryInterest(interest);
+ }
+ else {
processSyncInterest(interest.shared_from_this());
}
- else
- // reset interest
- processResetInterest(interest);
_LOG_DEBUG_ID("<< Logic::onSyncInterest");
}
@@ -451,9 +457,9 @@
}
else {
// OK, nobody is helping us, just tell the truth.
- _LOG_DEBUG_ID("OK, nobody is helping us, just tell the truth");
+ _LOG_DEBUG_ID("OK, nobody is helping us, let us try to recover");
m_interestTable.erase(digest);
- sendSyncData(m_defaultUserPrefix, name, m_state);
+ sendRecoveryInterest(digest);
}
_LOG_DEBUG_ID("<< Logic::processSyncInterest");
@@ -687,4 +693,61 @@
_LOG_DEBUG_ID("Hash: " << hash);
}
-} // namespace chronosync
+void
+Logic::sendRecoveryInterest(ndn::ConstBufferPtr digest)
+{
+ _LOG_DEBUG_ID(">> Logic::sendRecoveryInterest");
+
+ Name interestName;
+ interestName.append(m_syncPrefix)
+ .append(RECOVERY_COMPONENT)
+ .append(ndn::name::Component(*digest));
+
+ Interest interest(interestName);
+ interest.setMustBeFresh(true);
+ interest.setInterestLifetime(m_recoveryInterestLifetime);
+
+ m_face.expressInterest(interest, bind(&Logic::onRecoveryData, this, _1, _2),
+ bind(&Logic::onRecoveryTimeout, this, _1));
+
+ _LOG_DEBUG_ID("interest: " << interest.getName());
+ _LOG_DEBUG_ID("<< Logic::sendRecoveryInterest");
+}
+
+void
+Logic::processRecoveryInterest(const Interest& interest)
+{
+ _LOG_DEBUG_ID(">> Logic::processRecoveryInterest");
+
+ const Name& name = interest.getName();
+ ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
+
+ ConstBufferPtr rootDigest = m_state.getRootDigest();
+
+ DiffStateContainer::iterator stateIter = m_log.find(digest);
+
+ if (stateIter != m_log.end() || *digest == *EMPTY_DIGEST || *rootDigest == *digest) {
+ _LOG_DEBUG_ID("I can help you recover");
+ sendSyncData(m_defaultUserPrefix, name, m_state);
+ return;
+ }
+ _LOG_DEBUG_ID("<< Logic::processRecoveryInterest");
+}
+
+void
+Logic::onRecoveryData(const Interest& interest, Data& data)
+{
+ _LOG_DEBUG_ID(">> Logic::onRecoveryData");
+ onSyncDataValidated(data.shared_from_this());
+ _LOG_DEBUG_ID("<< Logic::onRecoveryData");
+}
+
+void
+Logic::onRecoveryTimeout(const Interest& interest)
+{
+ _LOG_DEBUG_ID(">> Logic::onRecoveryTimeout");
+ _LOG_DEBUG_ID("Interest: " << interest.getName());
+ _LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
+}
+
+} // namespace chronosync
\ No newline at end of file
diff --git a/src/logic.hpp b/src/logic.hpp
index d4a811f..c684fa2 100644
--- a/src/logic.hpp
+++ b/src/logic.hpp
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
- * Copyright (c) 2012-2014 University of California, Los Angeles
+ * Copyright (c) 2012-2017 University of California, Los Angeles
*
* This file is part of ChronoSync, synchronization library for distributed realtime
* applications for NDN.
@@ -20,6 +20,7 @@
* @author Chaoyi Bian <bcy@pku.edu.cn>
* @author Alexander Afanasyev <http://lasr.cs.ucla.edu/afanasyev/index.html>
* @author Yingdi Yu <yingdi@cs.ucla.edu>
+ * @author Sonu Mishra <https://www.linkedin.com/in/mishrasonu>
*/
#ifndef CHRONOSYNC_LOGIC_HPP
@@ -48,7 +49,8 @@
* Instances of this class is usually used as elements of some containers
* such as std::vector, thus it is copyable.
*/
-class NodeInfo {
+class NodeInfo
+{
public:
Name userPrefix;
Name signingId;
@@ -97,6 +99,7 @@
static const time::milliseconds DEFAULT_RESET_INTEREST_LIFETIME;
static const time::milliseconds DEFAULT_SYNC_INTEREST_LIFETIME;
static const time::milliseconds DEFAULT_SYNC_REPLY_FRESHNESS;
+ static const time::milliseconds DEFAULT_RECOVERY_INTEREST_LIFETIME;
/**
* @brief Constructor
@@ -123,7 +126,8 @@
const time::steady_clock::Duration& cancelResetTimer = DEFAULT_CANCEL_RESET_TIMER,
const time::milliseconds& resetInterestLifetime = DEFAULT_RESET_INTEREST_LIFETIME,
const time::milliseconds& syncInterestLifetime = DEFAULT_SYNC_INTEREST_LIFETIME,
- const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS);
+ const time::milliseconds& syncReplyFreshness = DEFAULT_SYNC_REPLY_FRESHNESS,
+ const time::milliseconds& recoveryInterestLifetime = DEFAULT_RECOVERY_INTEREST_LIFETIME);
~Logic();
@@ -387,6 +391,47 @@
void
printDigest(ndn::ConstBufferPtr digest);
+ /**
+ * @brief Helper method to send Recovery Interest
+ *
+ * @param digest The digest to be included in the recovery interest
+ */
+ void
+ sendRecoveryInterest(ndn::ConstBufferPtr digest);
+
+ /**
+ * @brief Process Recovery Interest
+ *
+ * This method extracts the digest from the incoming Recovery Interest.
+ * If it recognizes this incoming digest, then it sends its full state
+ * as reply.
+ *
+ * @param interest The incoming interest
+ */
+ void
+ processRecoveryInterest(const Interest& interest);
+
+ /**
+ * @brief Callback to handle Recovery Reply
+ *
+ * This method calls Logic::onSyncDataValidated directly.
+ *
+ * @param interest The Recovery Interest
+ * @param data The reply to the Recovery Interest
+ */
+ void
+ onRecoveryData(const Interest& interest, Data& data);
+
+ /**
+ * @brief Callback to handle Recovery Interest timeout.
+ *
+ * This method does nothing.
+ *
+ * @param interest The Recovery Interest
+ */
+ void
+ onRecoveryTimeout(const Interest& interest);
+
public:
static const ndn::Name DEFAULT_NAME;
static const ndn::Name EMPTY_NAME;
@@ -397,6 +442,7 @@
static const ndn::ConstBufferPtr EMPTY_DIGEST;
static const ndn::name::Component RESET_COMPONENT;
+ static const ndn::name::Component RECOVERY_COMPONENT;
// Communication
ndn::Face& m_face;
@@ -438,6 +484,8 @@
time::milliseconds m_syncInterestLifetime;
/// @brief FreshnessPeriod of SyncReply
time::milliseconds m_syncReplyFreshness;
+ /// @brief Lifetime of recovery interest
+ time::milliseconds m_recoveryInterestLifetime;
// Security
ndn::Name m_defaultSigningId;
@@ -454,4 +502,4 @@
} // namespace chronosync
-#endif // CHRONOSYNC_LOGIC_HPP
+#endif // CHRONOSYNC_LOGIC_HPP
\ No newline at end of file
diff --git a/tests/unit-tests/test-logic.cpp b/tests/unit-tests/test-logic.cpp
index ed860af..342af4c 100644
--- a/tests/unit-tests/test-logic.cpp
+++ b/tests/unit-tests/test-logic.cpp
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
- * Copyright (c) 2012-2016 University of California, Los Angeles
+ * Copyright (c) 2012-2017 University of California, Los Angeles
*
* This file is part of ChronoSync, synchronization library for distributed realtime
* applications for NDN.
@@ -76,12 +76,14 @@
userPrefix[0] = Name("/user0");
userPrefix[1] = Name("/user1");
userPrefix[2] = Name("/user2");
+ userPrefix[3] = Name("/user3");
faces[0].reset(new DummyClientFace(io, {true, true}));
faces[1].reset(new DummyClientFace(io, {true, true}));
faces[2].reset(new DummyClientFace(io, {true, true}));
+ faces[3].reset(new DummyClientFace(io, {true, true}));
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 4; i++) {
readInterestOffset[i] = 0;
readDataOffset[i] = 0;
}
@@ -90,7 +92,7 @@
void
passPacket()
{
- for (int i = 0; i < 3; i++)
+ for (int i = 0; i < 4; i++)
checkFace(i);
}
@@ -98,14 +100,14 @@
checkFace(int sender)
{
while (faces[sender]->sentInterests.size() > readInterestOffset[sender]) {
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 4; i++) {
if (sender != i)
faces[i]->receive(faces[sender]->sentInterests[readInterestOffset[sender]]);
}
readInterestOffset[sender]++;
}
while (faces[sender]->sentData.size() > readDataOffset[sender]) {
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 4; i++) {
if (sender != i)
faces[i]->receive(faces[sender]->sentData[readDataOffset[sender]]);
}
@@ -115,13 +117,13 @@
public:
Name syncPrefix;
- Name userPrefix[3];
+ Name userPrefix[4];
- std::unique_ptr<DummyClientFace> faces[3];
- shared_ptr<Handler> handler[3];
+ std::unique_ptr<DummyClientFace> faces[4];
+ shared_ptr<Handler> handler[4];
- size_t readInterestOffset[3];
- size_t readDataOffset[3];
+ size_t readInterestOffset[4];
+ size_t readDataOffset[4];
};
BOOST_FIXTURE_TEST_SUITE(LogicTests, LogicFixture)
@@ -245,8 +247,8 @@
advanceClocks(ndn::time::milliseconds(2), 10);
passPacket();
}
- BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1);
- BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 2);
+ BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1);
+ BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2);
handler[2]->updateSeqNo(4);
@@ -291,6 +293,85 @@
BOOST_CHECK_EQUAL(handler[2]->map[handler[1]->logic.getSessionName()], 2);
}
+BOOST_AUTO_TEST_CASE(PartitionRecover)
+{
+ handler[0] = make_shared<Handler>(ref(*faces[0]), syncPrefix, userPrefix[0]);
+ advanceClocks(ndn::time::milliseconds(10), 10);
+
+ handler[1] = make_shared<Handler>(ref(*faces[1]), syncPrefix, userPrefix[1]);
+ advanceClocks(ndn::time::milliseconds(10), 10);
+
+ handler[2] = make_shared<Handler>(ref(*faces[2]), syncPrefix, userPrefix[2]);
+ advanceClocks(ndn::time::milliseconds(10), 10);
+
+ handler[3] = make_shared<Handler>(ref(*faces[3]), syncPrefix, userPrefix[3]);
+ advanceClocks(ndn::time::milliseconds(10), 30);
+
+ handler[0]->updateSeqNo(1);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 1);
+ BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 1);
+ BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 1);
+
+ handler[2]->updateSeqNo(2);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 2);
+ BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 2);
+ BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 2);
+
+ // Network Partition start
+
+ handler[1]->updateSeqNo(3);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[0]->map[handler[1]->logic.getSessionName()], 3);
+ handler[2]->map[handler[1]->logic.getSessionName()] = 0;
+ handler[3]->map[handler[1]->logic.getSessionName()] = 0;
+
+ handler[3]->updateSeqNo(4);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[2]->map[handler[3]->logic.getSessionName()], 4);
+ handler[0]->map[handler[3]->logic.getSessionName()] = 0;
+ handler[1]->map[handler[3]->logic.getSessionName()] = 0;
+
+ // Network partition over
+
+ handler[0]->updateSeqNo(5);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[1]->map[handler[0]->logic.getSessionName()], 5);
+ BOOST_CHECK_EQUAL(handler[2]->map[handler[0]->logic.getSessionName()], 5);
+ BOOST_CHECK_EQUAL(handler[3]->map[handler[0]->logic.getSessionName()], 5);
+
+ handler[2]->updateSeqNo(6);
+
+ for (int i = 0; i < 50; i++) {
+ advanceClocks(ndn::time::milliseconds(2), 10);
+ passPacket();
+ }
+ BOOST_CHECK_EQUAL(handler[0]->map[handler[2]->logic.getSessionName()], 6);
+ BOOST_CHECK_EQUAL(handler[1]->map[handler[2]->logic.getSessionName()], 6);
+ BOOST_CHECK_EQUAL(handler[3]->map[handler[2]->logic.getSessionName()], 6);
+}
+
BOOST_AUTO_TEST_CASE(MultipleUserUnderOneLogic)
{
handler[0] = make_shared<Handler>(ref(*faces[0]), syncPrefix, userPrefix[0]);