chunks: AIMD congestion control
refs #3636
Change-Id: Ia5e601201219048eb5c745bba9627e4916dac31a
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
new file mode 100644
index 0000000..cbff235
--- /dev/null
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -0,0 +1,509 @@
+/**
+ * Copyright (c) 2016, Regents of the University of California,
+ * Colorado State University,
+ * University Pierre & Marie Curie, Sorbonne University.
+ *
+ * This file is part of ndn-tools (Named Data Networking Essential Tools).
+ * See AUTHORS.md for complete list of ndn-tools authors and contributors.
+ *
+ * ndn-tools is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE. See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
+ *
+ * @author Shuo Yang
+ * @author Weiwei Liu
+ */
+
+#include "pipeline-interests-aimd.hpp"
+
+#include <cmath>
+
+namespace ndn {
+namespace chunks {
+namespace aimd {
+
+PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
+ const Options& options)
+ : PipelineInterests(face)
+ , m_options(options)
+ , m_rttEstimator(rttEstimator)
+ , m_scheduler(m_face.getIoService())
+ , m_nextSegmentNo(0)
+ , m_receivedSize(0)
+ , m_highData(0)
+ , m_highInterest(0)
+ , m_recPoint(0)
+ , m_nInFlight(0)
+ , m_nReceived(0)
+ , m_nLossEvents(0)
+ , m_nRetransmitted(0)
+ , m_cwnd(m_options.initCwnd)
+ , m_ssthresh(m_options.initSsthresh)
+ , m_hasFailure(false)
+ , m_failedSegNo(0)
+{
+ if (m_options.isVerbose) {
+ std::cerr << m_options;
+ }
+}
+
+PipelineInterestsAimd::~PipelineInterestsAimd()
+{
+ cancel();
+}
+
+void
+PipelineInterestsAimd::doRun()
+{
+ // record the start time of running pipeline
+ m_startTime = time::steady_clock::now();
+
+ // count the excluded segment
+ m_nReceived++;
+
+ // schedule the event to check retransmission timer
+ m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
+
+ sendInterest(getNextSegmentNo(), false);
+}
+
+void
+PipelineInterestsAimd::doCancel()
+{
+ for (const auto& entry : m_segmentInfo) {
+ const SegmentInfo& segInfo = entry.second;
+ m_face.removePendingInterest(segInfo.interestId);
+ }
+ m_segmentInfo.clear();
+ m_scheduler.cancelAllEvents();
+}
+
+void
+PipelineInterestsAimd::checkRto()
+{
+ if (isStopping())
+ return;
+
+ int timeoutCount = 0;
+
+ for (auto& entry : m_segmentInfo) {
+ SegmentInfo& segInfo = entry.second;
+ if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
+ segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
+ Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
+ if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
+ uint64_t timedoutSeg = entry.first;
+ m_retxQueue.push(timedoutSeg); // put on retx queue
+ segInfo.state = SegmentState::InRetxQueue; // update status
+ timeoutCount++;
+ }
+ }
+ }
+
+ if (timeoutCount > 0) {
+ handleTimeout(timeoutCount);
+ }
+
+ // schedule the next check after predefined interval
+ m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
+}
+
+void
+PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
+{
+ if (isStopping())
+ return;
+
+ if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
+ return;
+
+ if (!isRetransmission && m_hasFailure)
+ return;
+
+ if (m_options.isVerbose) {
+ if (isRetransmission)
+ std::cerr << "Retransmitting segment #" << segNo << std::endl;
+ else
+ std::cerr << "Requesting segment #" << segNo << std::endl;
+ }
+
+ if (isRetransmission) {
+ auto ret = m_retxCount.insert(std::make_pair(segNo, 1));
+ if (ret.second == false) { // not the first retransmission
+ m_retxCount[segNo] += 1;
+ if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
+ return handleFail(segNo, "Reached the maximum number of retries (" +
+ to_string(m_options.maxRetriesOnTimeoutOrNack) +
+ ") while retrieving segment #" + to_string(segNo));
+ }
+
+ if (m_options.isVerbose) {
+ std::cerr << "# of retries for segment #" << segNo
+ << " is " << m_retxCount[segNo] << std::endl;
+ }
+ }
+
+ m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
+ }
+
+ Interest interest(Name(m_prefix).appendSegment(segNo));
+ interest.setInterestLifetime(m_options.interestLifetime);
+ interest.setMustBeFresh(m_options.mustBeFresh);
+ interest.setMaxSuffixComponents(1);
+
+ auto interestId = m_face.expressInterest(interest,
+ bind(&PipelineInterestsAimd::handleData, this, _1, _2),
+ bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
+ bind(&PipelineInterestsAimd::handleLifetimeExpiration,
+ this, _1));
+
+ m_nInFlight++;
+
+ if (isRetransmission) {
+ SegmentInfo& segInfo = m_segmentInfo[segNo];
+ segInfo.state = SegmentState::Retransmitted;
+ segInfo.rto = m_rttEstimator.getEstimatedRto();
+ segInfo.timeSent = time::steady_clock::now();
+ m_nRetransmitted++;
+ }
+ else {
+ m_highInterest = segNo;
+ Milliseconds rto = m_rttEstimator.getEstimatedRto();
+ SegmentInfo segInfo{interestId, SegmentState::FirstTimeSent, rto, time::steady_clock::now()};
+
+ m_segmentInfo.emplace(segNo, segInfo);
+ }
+}
+
+void
+PipelineInterestsAimd::schedulePackets()
+{
+ int availableWindowSize = static_cast<int>(m_cwnd) - m_nInFlight;
+ while (availableWindowSize > 0) {
+ if (!m_retxQueue.empty()) { // do retransmission first
+ uint64_t retxSegNo = m_retxQueue.front();
+ m_retxQueue.pop();
+
+ auto it = m_segmentInfo.find(retxSegNo);
+ if (it == m_segmentInfo.end()) {
+ continue;
+ }
+ // the segment is still in the map, it means that it needs to be retransmitted
+ sendInterest(retxSegNo, true);
+ }
+ else { // send next segment
+ sendInterest(getNextSegmentNo(), false);
+ }
+ availableWindowSize--;
+ }
+}
+
+void
+PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
+{
+ if (isStopping())
+ return;
+
+ // Data name will not have extra components because MaxSuffixComponents is set to 1
+ BOOST_ASSERT(data.getName().equals(interest.getName()));
+
+ if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
+ m_lastSegmentNo = data.getFinalBlockId().toSegment();
+ m_hasFinalBlockId = true;
+ cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
+ if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
+ // previously failed segment is part of the content
+ return onFailure(m_failureReason);
+ } else {
+ m_hasFailure = false;
+ }
+ }
+
+ uint64_t recvSegNo = data.getName()[-1].toSegment();
+ if (m_highData < recvSegNo) {
+ m_highData = recvSegNo;
+ }
+
+ SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
+ if (segInfo.state == SegmentState::RetxReceived) {
+ m_segmentInfo.erase(recvSegNo);
+ return; // ignore already-received segment
+ }
+
+ Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
+
+ if (m_options.isVerbose) {
+ std::cerr << "Received segment #" << recvSegNo
+ << ", rtt=" << rtt.count() << "ms"
+ << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
+ }
+
+ // for segments in retransmission queue, no need to decrement m_nInFlight since
+ // it's already been decremented when segments timed out
+ if (segInfo.state != SegmentState::InRetxQueue && m_nInFlight > 0) {
+ m_nInFlight--;
+ }
+
+ m_receivedSize += data.getContent().value_size();
+ m_nReceived++;
+
+ increaseWindow();
+ onData(interest, data);
+
+ if (segInfo.state == SegmentState::FirstTimeSent ||
+ segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
+ size_t nExpectedSamples = std::max(static_cast<int>(std::ceil(m_nInFlight / 2.0)), 1);
+ m_rttEstimator.addMeasurement(recvSegNo, rtt, nExpectedSamples);
+ m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
+ }
+ else { // retransmission
+ segInfo.state = SegmentState::RetxReceived;
+ }
+
+ BOOST_ASSERT(m_nReceived > 0);
+ if (m_hasFinalBlockId && m_nReceived - 1 >= m_lastSegmentNo) { // all segments have been received
+ cancel();
+ if (m_options.isVerbose) {
+ printSummary();
+ }
+ }
+ else {
+ schedulePackets();
+ }
+}
+
+void
+PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
+{
+ if (isStopping())
+ return;
+
+ if (m_options.isVerbose)
+ std::cerr << "Received Nack with reason " << nack.getReason()
+ << " for Interest " << interest << std::endl;
+
+ uint64_t segNo = interest.getName()[-1].toSegment();
+
+ switch (nack.getReason()) {
+ case lp::NackReason::DUPLICATE: {
+ break; // ignore duplicates
+ }
+ case lp::NackReason::CONGESTION: { // treated the same as timeout for now
+ m_retxQueue.push(segNo); // put on retx queue
+ m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
+ handleTimeout(1);
+ break;
+ }
+ default: {
+ handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
+ ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
+ break;
+ }
+ }
+}
+
+void
+PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
+{
+ if (isStopping())
+ return;
+
+ uint64_t segNo = interest.getName()[-1].toSegment();
+ m_retxQueue.push(segNo); // put on retx queue
+ m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
+ handleTimeout(1);
+}
+
+void
+PipelineInterestsAimd::handleTimeout(int timeoutCount)
+{
+ if (timeoutCount <= 0)
+ return;
+
+ if (m_options.disableCwa || m_highData > m_recPoint) {
+ // react to only one timeout per RTT (conservative window adaptation)
+ m_recPoint = m_highInterest;
+
+ decreaseWindow();
+ m_rttEstimator.backoffRto();
+ m_nLossEvents++;
+
+ if (m_options.isVerbose) {
+ std::cerr << "Packet loss event, cwnd = " << m_cwnd
+ << ", ssthresh = " << m_ssthresh << std::endl;
+ }
+ }
+
+ if (m_nInFlight > static_cast<uint64_t>(timeoutCount))
+ m_nInFlight -= timeoutCount;
+ else
+ m_nInFlight = 0;
+
+ schedulePackets();
+}
+
+void
+PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
+{
+ if (isStopping())
+ return;
+
+ // if the failed segment is definitely part of the content, raise a fatal error
+ if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
+ return onFailure(reason);
+
+ if (!m_hasFinalBlockId) {
+ m_segmentInfo.erase(segNo);
+ if (m_nInFlight > 0)
+ m_nInFlight--;
+
+ if (m_segmentInfo.empty()) {
+ onFailure("Fetching terminated but no final segment number has been found");
+ }
+ else {
+ cancelInFlightSegmentsGreaterThan(segNo);
+ m_hasFailure = true;
+ m_failedSegNo = segNo;
+ m_failureReason = reason;
+ }
+ }
+}
+
+void
+PipelineInterestsAimd::increaseWindow()
+{
+ if (m_cwnd < m_ssthresh) {
+ m_cwnd += m_options.aiStep; // additive increase
+ } else {
+ m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
+ }
+ afterCwndChange(time::steady_clock::now() - m_startTime, m_cwnd);
+}
+
+void
+PipelineInterestsAimd::decreaseWindow()
+{
+ // please refer to RFC 5681, Section 3.1 for the rationale behind it
+ m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
+ m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
+ afterCwndChange(time::steady_clock::now() - m_startTime, m_cwnd);
+}
+
+uint64_t
+PipelineInterestsAimd::getNextSegmentNo()
+{
+ // get around the excluded segment
+ if (m_nextSegmentNo == m_excludedSegmentNo)
+ m_nextSegmentNo++;
+ return m_nextSegmentNo++;
+}
+
+void
+PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segmentNo)
+{
+ for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
+ // cancel fetching all segments that follow
+ if (it->first > segmentNo) {
+ m_face.removePendingInterest(it->second.interestId);
+ it = m_segmentInfo.erase(it);
+ if (m_nInFlight > 0)
+ m_nInFlight--;
+ }
+ else {
+ ++it;
+ }
+ }
+}
+
+void
+PipelineInterestsAimd::printSummary() const
+{
+ Milliseconds timePassed = time::steady_clock::now() - m_startTime;
+ double throughput = (8 * m_receivedSize * 1000) / timePassed.count();
+
+ int pow = 0;
+ std::string throughputUnit;
+ while (throughput >= 1000.0 && pow < 4) {
+ throughput /= 1000.0;
+ pow++;
+ }
+ switch (pow) {
+ case 0:
+ throughputUnit = "bit/s";
+ break;
+ case 1:
+ throughputUnit = "kbit/s";
+ break;
+ case 2:
+ throughputUnit = "Mbit/s";
+ break;
+ case 3:
+ throughputUnit = "Gbit/s";
+ break;
+ case 4:
+ throughputUnit = "Tbit/s";
+ break;
+ }
+
+ std::cerr << "\nAll segments have been received.\n"
+ << "Total # of segments received: " << m_nReceived << "\n"
+ << "Time used: " << timePassed.count() << " ms" << "\n"
+ << "Total # of packet loss burst: " << m_nLossEvents << "\n"
+ << "Packet loss rate: "
+ << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
+ << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
+ << "Goodput: " << throughput << " " << throughputUnit << "\n";
+}
+
+std::ostream&
+operator<<(std::ostream& os, SegmentState state)
+{
+ switch (state) {
+ case SegmentState::FirstTimeSent:
+ os << "FirstTimeSent";
+ break;
+ case SegmentState::InRetxQueue:
+ os << "InRetxQueue";
+ break;
+ case SegmentState::Retransmitted:
+ os << "Retransmitted";
+ break;
+ case SegmentState::RetxReceived:
+ os << "RetxReceived";
+ break;
+ }
+
+ return os;
+}
+
+std::ostream&
+operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
+{
+ os << "PipelineInterestsAimd initial parameters:" << "\n"
+ << "\tInitial congestion window size = " << options.initCwnd << "\n"
+ << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
+ << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
+ << "\tAdditive increase step = " << options.aiStep << "\n"
+ << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
+ << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n";
+
+ std::string cwaStatus = options.disableCwa ? "disabled" : "enabled";
+ os << "\tConservative Window Adaptation " << cwaStatus << "\n";
+
+ std::string cwndStatus = options.resetCwndToInit ? "initCwnd" : "ssthresh";
+ os << "\tResetting cwnd to " << cwndStatus << " when loss event occurs" << "\n";
+ return os;
+}
+
+} // namespace aimd
+} // namespace chunks
+} // namespace ndn