communication: sync protocol adapater to add psync
refs: #4082
Change-Id: Ibe4649e709dfbc3cdc1f2afbfc4ff03f75a3f136
diff --git a/src/communication/sync-logic-handler.cpp b/src/communication/sync-logic-handler.cpp
index 1e8f90b..13f2e8e 100644
--- a/src/communication/sync-logic-handler.cpp
+++ b/src/communication/sync-logic-handler.cpp
@@ -23,16 +23,16 @@
#include "common.hpp"
#include "conf-parameter.hpp"
#include "lsa.hpp"
-#include "utility/name-helper.hpp"
#include "logger.hpp"
+#include "utility/name-helper.hpp"
namespace nlsr {
-INIT_LOGGER(SyncLogicHandler);
-
const std::string NLSR_COMPONENT = "nlsr";
const std::string LSA_COMPONENT = "LSA";
+INIT_LOGGER(SyncLogicHandler);
+
template<class T>
class NullDeleter
{
@@ -60,73 +60,59 @@
return;
}
- m_syncPrefix = syncPrefix;
-
// Build LSA sync update prefix
buildUpdatePrefix();
- NLSR_LOG_DEBUG("Creating Sync Logic object. Sync Prefix: " << m_syncPrefix);
+ NLSR_LOG_DEBUG("Creating Sync Logic object. Sync Prefix: " << syncPrefix);
// The face's lifetime is managed in main.cpp; Logic should not manage the memory
// of the object
std::shared_ptr<ndn::Face> facePtr(&m_syncFace, NullDeleter<ndn::Face>());
- const auto fixedSession = ndn::name::Component::fromNumber(0);
- m_syncLogic = std::make_shared<chronosync::Logic>(*facePtr, m_syncPrefix, m_nameLsaUserPrefix,
- std::bind(&SyncLogicHandler::onChronoSyncUpdate, this, _1),
- chronosync::Logic::DEFAULT_NAME,
- chronosync::Logic::DEFAULT_VALIDATOR,
- chronosync::Logic::DEFAULT_RESET_TIMER,
- chronosync::Logic::DEFAULT_CANCEL_RESET_TIMER,
- chronosync::Logic::DEFAULT_RESET_INTEREST_LIFETIME,
- syncInterestLifetime,
- chronosync::Logic::DEFAULT_SYNC_REPLY_FRESHNESS,
- chronosync::Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME,
- fixedSession);
+ m_syncLogic = std::make_shared<SyncProtocolAdapter>(*facePtr,
+ m_confParam.getSyncProtocol(),
+ syncPrefix,
+ m_nameLsaUserPrefix,
+ syncInterestLifetime,
+ std::bind(&SyncLogicHandler::processUpdate, this, _1, _2));
if (m_confParam.getHyperbolicState() == HYPERBOLIC_STATE_OFF) {
- m_syncLogic->addUserNode(m_adjLsaUserPrefix, chronosync::Logic::DEFAULT_NAME, fixedSession);
+ m_syncLogic->addUserNode(m_adjLsaUserPrefix);
}
else if (m_confParam.getHyperbolicState() == HYPERBOLIC_STATE_ON) {
- m_syncLogic->addUserNode(m_coorLsaUserPrefix, chronosync::Logic::DEFAULT_NAME, fixedSession);
+ m_syncLogic->addUserNode(m_coorLsaUserPrefix);
}
else {
- m_syncLogic->addUserNode(m_adjLsaUserPrefix, chronosync::Logic::DEFAULT_NAME, fixedSession);
- m_syncLogic->addUserNode(m_coorLsaUserPrefix, chronosync::Logic::DEFAULT_NAME, fixedSession);
+ m_syncLogic->addUserNode(m_adjLsaUserPrefix);
+ m_syncLogic->addUserNode(m_coorLsaUserPrefix);
}
}
void
-SyncLogicHandler::onChronoSyncUpdate(const std::vector<chronosync::MissingDataInfo>& v)
+SyncLogicHandler::processUpdate(const ndn::Name& updateName, uint64_t highSeq)
{
- NLSR_LOG_DEBUG("Received ChronoSync update event");
+ NLSR_LOG_DEBUG("Update Name: " << updateName << " Seq no: " << highSeq);
- for (size_t i = 0; i < v.size(); i++){
- ndn::Name updateName = v[i].session.getPrefix(-1);
+ int32_t nlsrPosition = util::getNameComponentPosition(updateName, nlsr::NLSR_COMPONENT);
+ int32_t lsaPosition = util::getNameComponentPosition(updateName, nlsr::LSA_COMPONENT);
- NLSR_LOG_DEBUG("Update Name: " << updateName << " Seq no: " << v[i].high);
-
- int32_t nlsrPosition = util::getNameComponentPosition(updateName, nlsr::NLSR_COMPONENT);
- int32_t lsaPosition = util::getNameComponentPosition(updateName, nlsr::LSA_COMPONENT);
-
- if (nlsrPosition < 0 || lsaPosition < 0) {
- NLSR_LOG_WARN("Received malformed sync update");
- return;
- }
-
- ndn::Name networkName = updateName.getSubName(1, nlsrPosition-1);
- ndn::Name routerName = updateName.getSubName(lsaPosition + 1).getPrefix(-1);
-
- ndn::Name originRouter = networkName;
- originRouter.append(routerName);
-
- processUpdateFromSync(originRouter, updateName, v[i].high);
+ if (nlsrPosition < 0 || lsaPosition < 0) {
+ NLSR_LOG_WARN("Received malformed sync update");
+ return;
}
+
+ ndn::Name networkName = updateName.getSubName(1, nlsrPosition-1);
+ ndn::Name routerName = updateName.getSubName(lsaPosition + 1).getPrefix(-1);
+
+ ndn::Name originRouter = networkName;
+ originRouter.append(routerName);
+
+ processUpdateFromSync(originRouter, updateName, highSeq);
}
void
SyncLogicHandler::processUpdateFromSync(const ndn::Name& originRouter,
- const ndn::Name& updateName, const uint64_t& seqNo)
+ const ndn::Name& updateName, uint64_t seqNo)
{
NLSR_LOG_DEBUG("Origin Router of update: " << originRouter);
@@ -169,13 +155,13 @@
switch (type) {
case Lsa::Type::ADJACENCY:
- m_syncLogic->updateSeqNo(seqNo, m_adjLsaUserPrefix);
+ m_syncLogic->publishUpdate(m_adjLsaUserPrefix, seqNo);
break;
case Lsa::Type::COORDINATE:
- m_syncLogic->updateSeqNo(seqNo, m_coorLsaUserPrefix);
+ m_syncLogic->publishUpdate(m_coorLsaUserPrefix, seqNo);
break;
case Lsa::Type::NAME:
- m_syncLogic->updateSeqNo(seqNo, m_nameLsaUserPrefix);
+ m_syncLogic->publishUpdate(m_nameLsaUserPrefix, seqNo);
break;
default:
break;
diff --git a/src/communication/sync-logic-handler.hpp b/src/communication/sync-logic-handler.hpp
index 8be327c..ae72fb8 100644
--- a/src/communication/sync-logic-handler.hpp
+++ b/src/communication/sync-logic-handler.hpp
@@ -26,10 +26,10 @@
#include "test-access-control.hpp"
#include "signals.hpp"
#include "lsa.hpp"
+#include "sync-protocol-adapter.hpp"
#include <ndn-cxx/face.hpp>
#include <ndn-cxx/util/signal.hpp>
-#include <ChronoSync/logic.hpp>
#include <boost/throw_exception.hpp>
class InterestManager;
@@ -64,17 +64,6 @@
SyncLogicHandler(ndn::Face& face, const IsLsaNew& isLsaNew, const ConfParameter& conf);
- /*! \brief Hook function to call whenever sync detects new data.
- *
- * This function packages the sync information into discrete updates
- * and passes those off to another function, processUpdateFromSync.
- * \sa processUpdateFromSync
- *
- * \param v A container with the new information sync has received
- */
- void
- onChronoSyncUpdate(const std::vector<chronosync::MissingDataInfo>& v);
-
/*! \brief Instruct ChronoSync to publish an update.
*
* This function instructs sync to push an update into the network,
@@ -99,13 +88,15 @@
const ndn::time::milliseconds& syncInterestLifetime =
ndn::time::milliseconds(SYNC_INTEREST_LIFETIME_DEFAULT));
+ void
+ processUpdate(const ndn::Name& updateName, uint64_t highSeq);
+
PUBLIC_WITH_TESTS_ELSE_PRIVATE:
/*! \brief Simple function to glue Name components together
*/
void
buildUpdatePrefix();
-private:
/*! \brief Determine which kind of LSA was updated and fetch it.
*
* Checks that the received update is not from us, which can happen,
@@ -116,15 +107,16 @@
*/
void
processUpdateFromSync(const ndn::Name& originRouter,
- const ndn::Name& updateName, const uint64_t& seqNo);
+ const ndn::Name& updateName, uint64_t seqNo);
public:
std::unique_ptr<OnNewLsa> onNewLsa;
private:
ndn::Face& m_syncFace;
- std::shared_ptr<chronosync::Logic> m_syncLogic;
- ndn::Name m_syncPrefix;
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ std::shared_ptr<SyncProtocolAdapter> m_syncLogic;
+private:
IsLsaNew m_isLsaNew;
const ConfParameter& m_confParam;
@@ -136,7 +128,6 @@
private:
static const std::string NLSR_COMPONENT;
static const std::string LSA_COMPONENT;
-
};
} // namespace nlsr
diff --git a/src/communication/sync-protocol-adapter.cpp b/src/communication/sync-protocol-adapter.cpp
new file mode 100644
index 0000000..af263c8
--- /dev/null
+++ b/src/communication/sync-protocol-adapter.cpp
@@ -0,0 +1,110 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2018, The University of Memphis,
+ * Regents of the University of California,
+ * Arizona Board of Regents.
+ *
+ * This file is part of NLSR (Named-data Link State Routing).
+ * See AUTHORS.md for complete list of NLSR authors and contributors.
+ *
+ * NLSR is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#include "sync-protocol-adapter.hpp"
+#include "logger.hpp"
+
+INIT_LOGGER(SyncProtocolAdapter);
+
+namespace nlsr {
+
+const auto FIXED_SESSION = ndn::name::Component::fromNumber(0);
+
+SyncProtocolAdapter::SyncProtocolAdapter(ndn::Face& face,
+ int32_t syncProtocol,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds syncInterestLifetime,
+ const SyncUpdateCallback& syncUpdateCallback)
+ : m_syncProtocol(syncProtocol)
+ , m_syncUpdateCallback(syncUpdateCallback)
+{
+ if (m_syncProtocol == SYNC_PROTOCOL_CHRONOSYNC) {
+ NDN_LOG_DEBUG("Using ChronoSync");
+ m_chronoSyncLogic = std::make_shared<chronosync::Logic>(face,
+ syncPrefix,
+ userPrefix,
+ std::bind(&SyncProtocolAdapter::onChronoSyncUpdate, this, _1),
+ chronosync::Logic::DEFAULT_NAME,
+ chronosync::Logic::DEFAULT_VALIDATOR,
+ chronosync::Logic::DEFAULT_RESET_TIMER,
+ chronosync::Logic::DEFAULT_CANCEL_RESET_TIMER,
+ chronosync::Logic::DEFAULT_RESET_INTEREST_LIFETIME,
+ syncInterestLifetime,
+ chronosync::Logic::DEFAULT_SYNC_REPLY_FRESHNESS,
+ chronosync::Logic::DEFAULT_RECOVERY_INTEREST_LIFETIME,
+ FIXED_SESSION);
+ }
+ else {
+ NDN_LOG_DEBUG("Using PSync");
+ m_psyncLogic = std::make_shared<psync::FullProducer>(80,
+ face,
+ syncPrefix,
+ userPrefix,
+ std::bind(&SyncProtocolAdapter::onPSyncUpdate, this, _1),
+ syncInterestLifetime);
+ }
+}
+
+void
+SyncProtocolAdapter::addUserNode(const ndn::Name& userPrefix)
+{
+ if (m_syncProtocol == SYNC_PROTOCOL_CHRONOSYNC) {
+ m_chronoSyncLogic->addUserNode(userPrefix, chronosync::Logic::DEFAULT_NAME, FIXED_SESSION);
+ }
+ else {
+ m_psyncLogic->addUserNode(userPrefix);
+ }
+}
+
+void
+SyncProtocolAdapter::publishUpdate(const ndn::Name& userPrefix, uint64_t seq)
+{
+ if (m_syncProtocol == SYNC_PROTOCOL_CHRONOSYNC) {
+ m_chronoSyncLogic->updateSeqNo(seq, userPrefix);
+ }
+ else {
+ m_psyncLogic->publishName(userPrefix, seq);
+ }
+}
+
+void
+SyncProtocolAdapter::onChronoSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates)
+{
+ NLSR_LOG_TRACE("Received ChronoSync update event");
+
+ for (const auto& update : updates) {
+ // Remove FIXED_SESSION
+ m_syncUpdateCallback(update.session.getPrefix(-1), update.high);
+ }
+}
+
+void
+SyncProtocolAdapter::onPSyncUpdate(const std::vector<psync::MissingDataInfo>& updates)
+{
+ NLSR_LOG_TRACE("Received PSync update event");
+
+ for (const auto& update : updates) {
+ m_syncUpdateCallback(update.prefix, update.highSeq);
+ }
+}
+
+} // namespace nlsr
\ No newline at end of file
diff --git a/src/communication/sync-protocol-adapter.hpp b/src/communication/sync-protocol-adapter.hpp
new file mode 100644
index 0000000..363f02d
--- /dev/null
+++ b/src/communication/sync-protocol-adapter.hpp
@@ -0,0 +1,96 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/**
+ * Copyright (c) 2014-2018, The University of Memphis,
+ * Regents of the University of California,
+ * Arizona Board of Regents.
+ *
+ * This file is part of NLSR (Named-data Link State Routing).
+ * See AUTHORS.md for complete list of NLSR authors and contributors.
+ *
+ * NLSR is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NLSR is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NLSR, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ **/
+
+#ifndef NLSR_SYNC_PROTOCOL_ADAPTER_HPP
+#define NLSR_SYNC_PROTOCOL_ADAPTER_HPP
+
+#include "conf-parameter.hpp"
+
+#include <ndn-cxx/face.hpp>
+#include <ChronoSync/logic.hpp>
+#include <PSync/full-producer.hpp>
+
+namespace nlsr {
+
+typedef std::function<void(const ndn::Name& updateName,
+ uint64_t seqNo)> SyncUpdateCallback;
+
+class SyncProtocolAdapter
+{
+public:
+ SyncProtocolAdapter(ndn::Face& facePtr,
+ int32_t syncProtocol,
+ const ndn::Name& syncPrefix,
+ const ndn::Name& userPrefix,
+ ndn::time::milliseconds syncInterestLifetime,
+ const SyncUpdateCallback& syncUpdateCallback);
+
+ /*! \brief Add user node to ChronoSync or PSync
+ *
+ * \param userPrefix the Name under which the application will publishData
+ */
+ void
+ addUserNode(const ndn::Name& userPrefix);
+
+ /*! \brief Publish update to ChronoSync or PSync
+ *
+ * NLSR forces sequences number on the sync protocol
+ * as it manages is its own sequence number by storing it in a file.
+ *
+ * \param userPrefix the Name to be updated
+ * \param seq the sequence of userPrefix
+ */
+ void
+ publishUpdate(const ndn::Name& userPrefix, uint64_t seq);
+
+PUBLIC_WITH_TESTS_ELSE_PRIVATE:
+ /*! \brief Hook function to call whenever ChronoSync detects new data.
+ *
+ * This function packages the sync information into discrete updates
+ * and passes those off to another function, m_syncUpdateCallback.
+ * \sa m_syncUpdateCallback
+ *
+ * \param v A container with the new information sync has received
+ */
+ void
+ onChronoSyncUpdate(const std::vector<chronosync::MissingDataInfo>& updates);
+
+ /*! \brief Hook function to call whenever PSync detects new data.
+ *
+ * This function packages the sync information into discrete updates
+ * and passes those off to another function, m_syncUpdateCallback.
+ * \sa m_syncUpdateCallback
+ *
+ * \param v A container with the new information sync has received
+ */
+ void
+ onPSyncUpdate(const std::vector<psync::MissingDataInfo>& updates);
+
+private:
+ int32_t m_syncProtocol;
+ SyncUpdateCallback m_syncUpdateCallback;
+ std::shared_ptr<chronosync::Logic> m_chronoSyncLogic;
+ std::shared_ptr<psync::FullProducer> m_psyncLogic;
+};
+
+} // namespace nlsr
+
+#endif // NLSR_SYNC_PROTOCOL_ADAPTER_HPP