logic: handle sending data packets larger than MAX_NDN_PACKET_SIZE
refs: #4140
Change-Id: I722604a55765ffc1f27639f4d21e69f118cec77d
diff --git a/src/logic.cpp b/src/logic.cpp
index a81b260..c83132c 100644
--- a/src/logic.cpp
+++ b/src/logic.cpp
@@ -86,9 +86,9 @@
, m_needPeriodReset(resetTimer > time::steady_clock::Duration::zero())
, m_onUpdate(onUpdate)
, m_scheduler(m_face.getIoService())
- , m_randomGenerator(static_cast<unsigned int>(std::time(0)))
- , m_rangeUniformRandom(m_randomGenerator, boost::uniform_int<>(100,500))
- , m_reexpressionJitter(m_randomGenerator, boost::uniform_int<>(100,500))
+ , m_rng(std::random_device{}())
+ , m_rangeUniformRandom(100, 500)
+ , m_reexpressionJitter(100, 500)
, m_resetTimer(resetTimer)
, m_cancelResetTimer(cancelResetTimer)
, m_resetInterestLifetime(resetInterestLifetime)
@@ -413,7 +413,7 @@
if (static_cast<bool>(m_delayedInterestProcessingId))
m_scheduler.cancelEvent(m_delayedInterestProcessingId);
- time::milliseconds after(m_rangeUniformRandom());
+ time::milliseconds after(m_rangeUniformRandom(m_rng));
_LOG_DEBUG_ID("After: " << after);
m_delayedInterestProcessingId =
m_scheduler.scheduleEvent(after,
@@ -454,7 +454,7 @@
m_scheduler.cancelEvent(m_delayedInterestProcessingId);
m_delayedInterestProcessingId =
- m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom()),
+ m_scheduler.scheduleEvent(time::milliseconds(m_rangeUniformRandom(m_rng)),
bind(&Logic::processSyncInterest, this, interest, true));
}
else {
@@ -530,7 +530,7 @@
if (static_cast<bool>(commit) && !commit->getLeaves().empty() && firstData) {
// state changed and it is safe to express a new interest
- time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter());
+ time::steady_clock::Duration after = time::milliseconds(m_reexpressionJitter(m_rng));
_LOG_DEBUG_ID("Reschedule sync interest after: " << after);
EventId eventId = m_scheduler.scheduleEvent(after,
bind(&Logic::sendSyncInterest, this));
@@ -587,7 +587,7 @@
_LOG_DEBUG_ID("ResetTimer: " << m_resetTimer);
EventId eventId =
- m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter()),
+ m_scheduler.scheduleEvent(m_resetTimer + ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
bind(&Logic::sendResetInterest, this));
m_scheduler.cancelEvent(m_resetInterestId);
m_resetInterestId = eventId;
@@ -621,7 +621,7 @@
EventId eventId =
m_scheduler.scheduleEvent(m_syncInterestLifetime / 2 +
- ndn::time::milliseconds(m_reexpressionJitter()),
+ ndn::time::milliseconds(m_reexpressionJitter(m_rng)),
bind(&Logic::sendSyncInterest, this));
m_scheduler.cancelEvent(m_reexpressingInterestId);
m_reexpressingInterestId = eventId;
@@ -640,19 +640,55 @@
}
void
+Logic::trimState(State& partialState, const State& state, size_t maxSize)
+{
+ partialState.reset();
+ State tmp;
+ std::vector<ConstLeafPtr> leaves;
+ for (const ConstLeafPtr& leaf : state.getLeaves()) {
+ leaves.push_back(leaf);
+ }
+
+ std::shuffle(leaves.begin(), leaves.end(), m_rng);
+
+ for (const auto& constLeafPtr : leaves) {
+ tmp.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
+ if (tmp.wireEncode().size() >= maxSize) {
+ break;
+ }
+ partialState.update(constLeafPtr->getSessionName(), constLeafPtr->getSeq());
+ }
+}
+
+void
Logic::sendSyncData(const Name& nodePrefix, const Name& name, const State& state)
{
_LOG_DEBUG_ID(">> Logic::sendSyncData");
+ if (m_nodeList.find(nodePrefix) == m_nodeList.end())
+ return;
+
Data syncReply(name);
syncReply.setContent(state.wireEncode());
syncReply.setFreshnessPeriod(m_syncReplyFreshness);
- if (m_nodeList.find(nodePrefix) == m_nodeList.end())
- return;
+
if (m_nodeList[nodePrefix].signingId.empty())
m_keyChain.sign(syncReply);
else
m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
+ if (syncReply.wireEncode().size() > ndn::MAX_NDN_PACKET_SIZE) {
+ _LOG_DEBUG("Sync reply size exceeded MAX_NDN_PACKET_SIZE");
+ auto maxContentSize = ndn::MAX_NDN_PACKET_SIZE - (syncReply.wireEncode().size() - state.wireEncode().size());
+ State partialState;
+ trimState(partialState, state, maxContentSize);
+ syncReply.setContent(partialState.wireEncode());
+
+ if (m_nodeList[nodePrefix].signingId.empty())
+ m_keyChain.sign(syncReply);
+ else
+ m_keyChain.sign(syncReply, security::signingByIdentity(m_nodeList[nodePrefix].signingId));
+ }
+
m_face.put(syncReply);
// checking if our own interest got satisfied
@@ -664,7 +700,7 @@
}
// re-schedule sending Sync interest
- time::milliseconds after(m_reexpressionJitter());
+ time::milliseconds after(m_reexpressionJitter(m_rng));
_LOG_DEBUG_ID("Satisfy our own interest");
_LOG_DEBUG_ID("Reschedule sync interest after " << after);
EventId eventId = m_scheduler.scheduleEvent(after, bind(&Logic::sendSyncInterest, this));
diff --git a/src/logic.hpp b/src/logic.hpp
index 453a843..b3e0fe6 100644
--- a/src/logic.hpp
+++ b/src/logic.hpp
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
- * Copyright (c) 2012-2017 University of California, Los Angeles
+ * Copyright (c) 2012-2018 University of California, Los Angeles
*
* This file is part of ChronoSync, synchronization library for distributed realtime
* applications for NDN.
@@ -33,10 +33,10 @@
#include <boost/archive/iterators/transform_width.hpp>
#include <boost/assert.hpp>
#include <boost/iterator/transform_iterator.hpp>
-#include <boost/random.hpp>
#include <boost/throw_exception.hpp>
#include <memory>
+#include <random>
#include <unordered_map>
namespace chronosync {
@@ -230,6 +230,9 @@
return m_state;
}
+ /// @brief Trim @p state to a subset @p partialState whose encoding does not exceed @p maxSize
+ void
+ trimState(State& partialState, const State& state, size_t maxSize);
private:
/**
@@ -498,9 +501,9 @@
ndn::EventId m_resetInterestId;
// Timer
- boost::mt19937 m_randomGenerator;
- boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_rangeUniformRandom;
- boost::variate_generator<boost::mt19937&, boost::uniform_int<> > m_reexpressionJitter;
+ std::mt19937 m_rng;
+ std::uniform_int_distribution<> m_rangeUniformRandom;
+ std::uniform_int_distribution<> m_reexpressionJitter;
/// @brief Timer to send next reset 0 for no reset
time::steady_clock::Duration m_resetTimer;
/// @brief Timer to cancel reset state
@@ -518,7 +521,6 @@
ndn::KeyChain m_keyChain;
std::shared_ptr<Validator> m_validator;
-
#ifdef _DEBUG
int m_instanceId;
static int s_instanceCounter;