logic: Implemented exclude filter
This commit implements the exclude filter mechanism that is required to
handle simultaneous data generations. When a node receives data, it
resends the corresponding sync interest with augmented exclude filter.
Change-Id: I6500abf29877e51bb2a91eca83f9c0eba0e59655
Refs: #3928
diff --git a/src/logic.cpp b/src/logic.cpp
index aa08acc..278115e 100644
--- a/src/logic.cpp
+++ b/src/logic.cpp
@@ -279,6 +279,7 @@
insertToDiffLog(commit, previousRoot);
satisfyPendingSyncInterests(prefix, commit);
+ formAndSendExcludeInterest(prefix, *commit, previousRoot);
}
}
}
@@ -344,12 +345,23 @@
Logic::onSyncData(const Interest& interest, Data& data)
{
_LOG_DEBUG_ID(">> Logic::onSyncData");
- if (static_cast<bool>(m_validator))
- m_validator->validate(data,
- bind(&Logic::onSyncDataValidated, this, _1),
- bind(&Logic::onSyncDataValidationFailed, this, _1));
- else
+ // if (static_cast<bool>(m_validator))
+ // m_validator->validate(data,
+ // bind(&Logic::onSyncDataValidated, this, _1),
+ // bind(&Logic::onSyncDataValidationFailed, this, _1));
+ // else
+ // onSyncDataValidated(data.shared_from_this());
+
+ if (interest.getExclude().empty()) {
+ _LOG_DEBUG_ID("First data");
onSyncDataValidated(data.shared_from_this());
+ }
+ else {
+ _LOG_DEBUG_ID("Data obtained using exclude filter");
+ onSyncDataValidated(data.shared_from_this(), false);
+ }
+ sendExcludeInterest(interest, data);
+
_LOG_DEBUG_ID("<< Logic::onSyncData");
}
@@ -375,12 +387,12 @@
}
void
-Logic::onSyncDataValidated(const shared_ptr<const Data>& data)
+Logic::onSyncDataValidated(const shared_ptr<const Data>& data, bool firstData)
{
Name name = data->getName();
ConstBufferPtr digest = make_shared<ndn::Buffer>(name.get(-1).value(), name.get(-1).value_size());
- processSyncData(name, digest, data->getContent().blockFromValue());
+ processSyncData(name, digest, data->getContent().blockFromValue(), firstData);
}
void
@@ -475,7 +487,8 @@
void
Logic::processSyncData(const Name& name,
ndn::ConstBufferPtr digest,
- const Block& syncReplyBlock)
+ const Block& syncReplyBlock,
+ bool firstData)
{
_LOG_DEBUG_ID(">> Logic::processSyncData");
DiffStatePtr commit = make_shared<DiffState>();
@@ -525,7 +538,7 @@
return;
}
- if (static_cast<bool>(commit) && !commit->getLeaves().empty()) {
+ if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
// state changed and it is safe to express a new interest
time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
_LOG_DEBUG_ID("Reschedule sync interest after: " << after);
@@ -750,4 +763,49 @@
_LOG_DEBUG_ID("<< Logic::onRecoveryTimeout");
}
+void
+Logic::sendExcludeInterest(const Interest& interest, const Data& data)
+{
+ _LOG_DEBUG_ID(">> Logic::sendExcludeInterest");
+
+ Name interestName = interest.getName();
+ Interest excludeInterest(interestName);
+
+ Exclude exclude = interest.getExclude();
+ exclude.excludeOne(data.getFullName().get(-1));
+ excludeInterest.setExclude(exclude);
+
+ excludeInterest.setInterestLifetime(m_syncInterestLifetime);
+
+ m_face.expressInterest(excludeInterest, bind(&Logic::onSyncData, this, _1, _2),
+ bind(&Logic::onSyncTimeout, this, _1));
+
+ _LOG_DEBUG_ID("Send interest: " << excludeInterest.getName());
+ _LOG_DEBUG_ID("<< Logic::sendExcludeInterest");
+}
+
+void
+Logic::formAndSendExcludeInterest(const Name& nodePrefix, const State& commit, ndn::ConstBufferPtr previousRoot)
+{
+ _LOG_DEBUG_ID(">> Logic::formAndSendExcludeInterest");
+ Name interestName;
+ interestName.append(m_syncPrefix)
+ .append(ndn::name::Component(*previousRoot));
+ Interest interest(interestName);
+
+ shared_ptr<Data> data = make_shared<Data>(interestName);
+ data->setContent(commit.wireEncode());
+ data->setFreshnessPeriod(m_syncReplyFreshness);
+ if (m_nodeList.find(nodePrefix) == m_nodeList.end())
+ return;
+ if (m_nodeList[nodePrefix].signingId.empty())
+ m_keyChain.sign(*data);
+ else
+ m_keyChain.signByIdentity(*data, m_nodeList[nodePrefix].signingId);
+
+ sendExcludeInterest(interest, *data);
+
+ _LOG_DEBUG_ID("<< Logic::formAndSendExcludeInterest");
+}
+
} // namespace chronosync
\ No newline at end of file
diff --git a/src/logic.hpp b/src/logic.hpp
index c684fa2..c452d20 100644
--- a/src/logic.hpp
+++ b/src/logic.hpp
@@ -303,9 +303,10 @@
* This method simply passes the valid reply to processSyncData.
*
* @param data The valid Sync Reply.
+ * @param firstData Whether the data is new or that obtained using exclude filter
*/
void
- onSyncDataValidated(const shared_ptr<const Data>& data);
+ onSyncDataValidated(const shared_ptr<const Data>& data, bool firstData = true);
/**
* @brief Process normal Sync Interest
@@ -343,11 +344,13 @@
* @param name The data name of the Sync Reply.
* @param digest The digest in the data name.
* @param syncReplyBlock The content of the Sync Reply.
+ * @param firstData Whether the data is new or obtained using exclude filter
*/
void
processSyncData(const Name& name,
ndn::ConstBufferPtr digest,
- const Block& syncReplyBlock);
+ const Block& syncReplyBlock,
+ bool firstData);
/**
* @brief Insert state diff into log
@@ -432,6 +435,29 @@
void
onRecoveryTimeout(const Interest& interest);
+ /**
+ * @brief Helper method to send Exclude Interest
+ *
+ * @param interest The interest whose exclude filter will be augmented
+ * @param data The data whose implicit digest will be inserted into exclude filter
+ */
+ void
+ sendExcludeInterest(const Interest& interest, const Data& data);
+
+ /**
+ * @brief Helper method to form the exclude Interest and calls sendExcludeInterest
+ *
+ * @param interest The interest whose exclude filter will be augmented
+ * @param nodePrefix The prefix of the sender node
+ * @param commit The commit whose contents will be used to obtain the implicit
+ digest to be excluded
+ * @param previousRoot The digest to be included in the interest
+ */
+ void
+ formAndSendExcludeInterest(const Name& nodePrefix,
+ const State& commit,
+ ndn::ConstBufferPtr previousRoot);
+
public:
static const ndn::Name DEFAULT_NAME;
static const ndn::Name EMPTY_NAME;