chunks: more cleanups to enhance code readability
Change-Id: I451d3c484dd476682ccb1be1f26535e18a34ab87
diff --git a/tests/chunks/pipeline-interests-aimd.t.cpp b/tests/chunks/pipeline-interests-aimd.t.cpp
index 3d0d5e2..ba729ab 100644
--- a/tests/chunks/pipeline-interests-aimd.t.cpp
+++ b/tests/chunks/pipeline-interests-aimd.t.cpp
@@ -1,8 +1,8 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2016, Regents of the University of California,
- * Colorado State University,
- * University Pierre & Marie Curie, Sorbonne University.
+ * Copyright (c) 2016-2017, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
*
* This file is part of ndn-tools (Named Data Networking Essential Tools).
* See AUTHORS.md for complete list of ndn-tools authors and contributors.
@@ -292,12 +292,12 @@
BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 6);
// receive nack with NackReason::NONE for segment 4
- auto nack= makeNack(face.sentInterests[3], lp::NackReason::NONE);
+ auto nack = makeNack(face.sentInterests[3], lp::NackReason::NONE);
face.receive(nack);
advanceClocks(io, time::nanoseconds(1));
// error not triggered
- // pending interests for segment > 4 haven been removed
+ // pending interests for segment > 4 have been removed
BOOST_CHECK_EQUAL(hasFailed, false);
BOOST_CHECK_EQUAL(face.getNPendingInterests(), 2);
@@ -306,7 +306,7 @@
advanceClocks(io, time::nanoseconds(1));
// timeout segment 3
- advanceClocks(io, time::milliseconds(250));
+ advanceClocks(io, time::seconds(1));
// segment 3 is retransmitted
BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[3], 1);
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index 622eb43..a20988b 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -72,7 +72,7 @@
// schedule the event to check retransmission timer
m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
- sendInterest(getNextSegmentNo(), false);
+ schedulePackets();
}
void
@@ -91,7 +91,7 @@
if (isStopping())
return;
- int timeoutCount = 0;
+ bool hasTimeout = false;
for (auto& entry : m_segmentInfo) {
SegmentInfo& segInfo = entry.second;
@@ -99,16 +99,15 @@
segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
- uint64_t timedoutSeg = entry.first;
- m_retxQueue.push(timedoutSeg); // put on retx queue
- segInfo.state = SegmentState::InRetxQueue; // update status
- timeoutCount++;
+ hasTimeout = true;
+ enqueueForRetransmission(entry.first);
}
}
}
- if (timeoutCount > 0) {
- handleTimeout(timeoutCount);
+ if (hasTimeout) {
+ recordTimeout();
+ schedulePackets();
}
// schedule the next check after predefined interval
@@ -128,14 +127,13 @@
return;
if (m_options.isVerbose) {
- if (isRetransmission)
- std::cerr << "Retransmitting segment #" << segNo << std::endl;
- else
- std::cerr << "Requesting segment #" << segNo << std::endl;
+ std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
+ << " segment #" << segNo << std::endl;
}
if (isRetransmission) {
- auto ret = m_retxCount.insert(std::make_pair(segNo, 1));
+ // keep track of retx count for this segment
+ auto ret = m_retxCount.emplace(segNo, 1);
if (ret.second == false) { // not the first retransmission
m_retxCount[segNo] += 1;
if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
@@ -168,17 +166,17 @@
if (isRetransmission) {
SegmentInfo& segInfo = m_segmentInfo[segNo];
- segInfo.state = SegmentState::Retransmitted;
- segInfo.rto = m_rttEstimator.getEstimatedRto();
segInfo.timeSent = time::steady_clock::now();
+ segInfo.rto = m_rttEstimator.getEstimatedRto();
+ segInfo.state = SegmentState::Retransmitted;
m_nRetransmitted++;
}
else {
m_highInterest = segNo;
- Milliseconds rto = m_rttEstimator.getEstimatedRto();
- SegmentInfo segInfo{interestId, SegmentState::FirstTimeSent, rto, time::steady_clock::now()};
-
- m_segmentInfo.emplace(segNo, segInfo);
+ m_segmentInfo[segNo] = {interestId,
+ time::steady_clock::now(),
+ m_rttEstimator.getEstimatedRto(),
+ SegmentState::FirstTimeSent};
}
}
@@ -229,10 +227,6 @@
}
uint64_t recvSegNo = getSegmentFromPacket(data);
- if (m_highData < recvSegNo) {
- m_highData = recvSegNo;
- }
-
SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
if (segInfo.state == SegmentState::RetxReceived) {
m_segmentInfo.erase(recvSegNo);
@@ -240,21 +234,24 @@
}
Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
-
if (m_options.isVerbose) {
std::cerr << "Received segment #" << recvSegNo
<< ", rtt=" << rtt.count() << "ms"
<< ", rto=" << segInfo.rto.count() << "ms" << std::endl;
}
- // for segments in retransmission queue, no need to decrement m_nInFlight since
- // it's already been decremented when segments timed out
- if (segInfo.state != SegmentState::InRetxQueue && m_nInFlight > 0) {
+ if (m_highData < recvSegNo) {
+ m_highData = recvSegNo;
+ }
+
+ // for segments in retx queue, we must not decrement m_nInFlight
+ // because it was already decremented when the segment timed out
+ if (segInfo.state != SegmentState::InRetxQueue) {
m_nInFlight--;
}
- m_receivedSize += data.getContent().value_size();
m_nReceived++;
+ m_receivedSize += data.getContent().value_size();
increaseWindow();
onData(interest, data);
@@ -267,6 +264,7 @@
m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
}
else { // retransmission
+ BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
segInfo.state = SegmentState::RetxReceived;
}
@@ -296,20 +294,19 @@
uint64_t segNo = getSegmentFromPacket(interest);
switch (nack.getReason()) {
- case lp::NackReason::DUPLICATE: {
- break; // ignore duplicates
- }
- case lp::NackReason::CONGESTION: { // treated the same as timeout for now
- m_retxQueue.push(segNo); // put on retx queue
- m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
- handleTimeout(1);
+ case lp::NackReason::DUPLICATE:
+ // ignore duplicates
break;
- }
- default: {
+ case lp::NackReason::CONGESTION:
+ // treated the same as timeout for now
+ enqueueForRetransmission(segNo);
+ recordTimeout();
+ schedulePackets();
+ break;
+ default:
handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
break;
- }
}
}
@@ -319,18 +316,14 @@
if (isStopping())
return;
- uint64_t segNo = getSegmentFromPacket(interest);
- m_retxQueue.push(segNo); // put on retx queue
- m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
- handleTimeout(1);
+ enqueueForRetransmission(getSegmentFromPacket(interest));
+ recordTimeout();
+ schedulePackets();
}
void
-PipelineInterestsAimd::handleTimeout(int timeoutCount)
+PipelineInterestsAimd::recordTimeout()
{
- if (timeoutCount <= 0)
- return;
-
if (m_options.disableCwa || m_highData > m_recPoint) {
// react to only one timeout per RTT (conservative window adaptation)
m_recPoint = m_highInterest;
@@ -344,9 +337,15 @@
<< ", ssthresh = " << m_ssthresh << std::endl;
}
}
+}
- m_nInFlight = std::max<int64_t>(0, m_nInFlight - timeoutCount);
- schedulePackets();
+void
+PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
+{
+ BOOST_ASSERT(m_nInFlight > 0);
+ m_nInFlight--;
+ m_retxQueue.push(segNo);
+ m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
}
void
@@ -361,8 +360,7 @@
if (!m_hasFinalBlockId) {
m_segmentInfo.erase(segNo);
- if (m_nInFlight > 0)
- m_nInFlight--;
+ m_nInFlight--;
if (m_segmentInfo.empty()) {
onFailure("Fetching terminated but no final segment number has been found");
@@ -384,6 +382,7 @@
} else {
m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
}
+
afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
}
@@ -393,6 +392,7 @@
// please refer to RFC 5681, Section 3.1 for the rationale behind it
m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
+
afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
}
@@ -406,15 +406,14 @@
}
void
-PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segmentNo)
+PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
{
for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
// cancel fetching all segments that follow
- if (it->first > segmentNo) {
+ if (it->first > segNo) {
m_face.removePendingInterest(it->second.interestId);
it = m_segmentInfo.erase(it);
- if (m_nInFlight > 0)
- m_nInFlight--;
+ m_nInFlight--;
}
else {
++it;
@@ -479,26 +478,21 @@
os << "RetxReceived";
break;
}
-
return os;
}
std::ostream&
operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
{
- os << "PipelineInterestsAimd initial parameters:" << "\n"
+ os << "PipelineInterestsAimd initial parameters:\n"
<< "\tInitial congestion window size = " << options.initCwnd << "\n"
<< "\tInitial slow start threshold = " << options.initSsthresh << "\n"
- << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
<< "\tAdditive increase step = " << options.aiStep << "\n"
+ << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
<< "\tRTO check interval = " << options.rtoCheckInterval << "\n"
- << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n";
-
- std::string cwaStatus = options.disableCwa ? "disabled" : "enabled";
- os << "\tConservative Window Adaptation " << cwaStatus << "\n";
-
- std::string cwndStatus = options.resetCwndToInit ? "initCwnd" : "ssthresh";
- os << "\tResetting cwnd to " << cwndStatus << " when loss event occurs" << "\n";
+ << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
+ << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
+ << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
return os;
}
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.hpp b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
index b51c59f..ec5bfce 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
@@ -74,12 +74,10 @@
*/
struct SegmentInfo
{
- const PendingInterestId* interestId; ///< The pending interest ID returned by
- ///< ndn::Face::expressInterest. It can be used with
- ///< removePendingInterest before retransmitting this Interest.
- SegmentState state;
- Milliseconds rto;
+ const PendingInterestId* interestId; ///< pending interest ID returned by ndn::Face::expressInterest
time::steady_clock::TimePoint timeSent;
+ Milliseconds rto;
+ SegmentState state;
};
/**
@@ -161,7 +159,10 @@
handleLifetimeExpiration(const Interest& interest);
void
- handleTimeout(int timeoutCount);
+ recordTimeout();
+
+ void
+ enqueueForRetransmission(uint64_t segNo);
void
handleFail(uint64_t segNo, const std::string& reason);
@@ -185,7 +186,7 @@
getNextSegmentNo();
void
- cancelInFlightSegmentsGreaterThan(uint64_t segmentNo);
+ cancelInFlightSegmentsGreaterThan(uint64_t segNo);
void
printSummary() const;
@@ -212,14 +213,13 @@
double m_cwnd; ///< current congestion window size (in segments)
double m_ssthresh; ///< current slow start threshold
- std::queue<uint64_t> m_retxQueue;
-
- std::unordered_map<uint64_t, SegmentInfo> m_segmentInfo; ///< the map keeps all the internal information
- ///< of the sent but not ackownledged segments
-
- std::unordered_map<uint64_t, int> m_retxCount; ///< maps segment number to its retransmission count.
+ std::unordered_map<uint64_t, SegmentInfo> m_segmentInfo; ///< keeps all the internal information
+ ///< on sent but not acked segments
+ std::unordered_map<uint64_t, int> m_retxCount; ///< maps segment number to its retransmission count;
///< if the count reaches to the maximum number of
///< timeout/nack retries, the pipeline will be aborted
+ std::queue<uint64_t> m_retxQueue;
+
bool m_hasFailure;
uint64_t m_failedSegNo;
std::string m_failureReason;