chunks: more cleanups to enhance code readability

Change-Id: I451d3c484dd476682ccb1be1f26535e18a34ab87
diff --git a/tests/chunks/pipeline-interests-aimd.t.cpp b/tests/chunks/pipeline-interests-aimd.t.cpp
index 3d0d5e2..ba729ab 100644
--- a/tests/chunks/pipeline-interests-aimd.t.cpp
+++ b/tests/chunks/pipeline-interests-aimd.t.cpp
@@ -1,8 +1,8 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /**
- * Copyright (c) 2016,  Regents of the University of California,
- *                      Colorado State University,
- *                      University Pierre & Marie Curie, Sorbonne University.
+ * Copyright (c) 2016-2017, Regents of the University of California,
+ *                          Colorado State University,
+ *                          University Pierre & Marie Curie, Sorbonne University.
  *
  * This file is part of ndn-tools (Named Data Networking Essential Tools).
  * See AUTHORS.md for complete list of ndn-tools authors and contributors.
@@ -292,12 +292,12 @@
   BOOST_REQUIRE_EQUAL(face.sentInterests.size(), 6);
 
   // receive nack with NackReason::NONE for segment 4
-  auto nack= makeNack(face.sentInterests[3], lp::NackReason::NONE);
+  auto nack = makeNack(face.sentInterests[3], lp::NackReason::NONE);
   face.receive(nack);
   advanceClocks(io, time::nanoseconds(1));
 
   // error not triggered
-  // pending interests for segment > 4 haven been removed
+  // pending interests for segment > 4 have been removed
   BOOST_CHECK_EQUAL(hasFailed, false);
   BOOST_CHECK_EQUAL(face.getNPendingInterests(), 2);
 
@@ -306,7 +306,7 @@
   advanceClocks(io, time::nanoseconds(1));
 
   // timeout segment 3
-  advanceClocks(io, time::milliseconds(250));
+  advanceClocks(io, time::seconds(1));
 
   // segment 3 is retransmitted
   BOOST_CHECK_EQUAL(aimdPipeline->m_retxCount[3], 1);
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.cpp b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
index 622eb43..a20988b 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.cpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.cpp
@@ -72,7 +72,7 @@
   // schedule the event to check retransmission timer
   m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
 
-  sendInterest(getNextSegmentNo(), false);
+  schedulePackets();
 }
 
 void
@@ -91,7 +91,7 @@
   if (isStopping())
     return;
 
-  int timeoutCount = 0;
+  bool hasTimeout = false;
 
   for (auto& entry : m_segmentInfo) {
     SegmentInfo& segInfo = entry.second;
@@ -99,16 +99,15 @@
         segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
       Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
       if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
-        uint64_t timedoutSeg = entry.first;
-        m_retxQueue.push(timedoutSeg); // put on retx queue
-        segInfo.state = SegmentState::InRetxQueue; // update status
-        timeoutCount++;
+        hasTimeout = true;
+        enqueueForRetransmission(entry.first);
       }
     }
   }
 
-  if (timeoutCount > 0) {
-    handleTimeout(timeoutCount);
+  if (hasTimeout) {
+    recordTimeout();
+    schedulePackets();
   }
 
   // schedule the next check after predefined interval
@@ -128,14 +127,13 @@
     return;
 
   if (m_options.isVerbose) {
-    if (isRetransmission)
-      std::cerr << "Retransmitting segment #" << segNo << std::endl;
-    else
-      std::cerr << "Requesting segment #" << segNo << std::endl;
+    std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
+              << " segment #" << segNo << std::endl;
   }
 
   if (isRetransmission) {
-    auto ret = m_retxCount.insert(std::make_pair(segNo, 1));
+    // keep track of retx count for this segment
+    auto ret = m_retxCount.emplace(segNo, 1);
     if (ret.second == false) { // not the first retransmission
       m_retxCount[segNo] += 1;
       if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
@@ -168,17 +166,17 @@
 
   if (isRetransmission) {
     SegmentInfo& segInfo = m_segmentInfo[segNo];
-    segInfo.state = SegmentState::Retransmitted;
-    segInfo.rto = m_rttEstimator.getEstimatedRto();
     segInfo.timeSent = time::steady_clock::now();
+    segInfo.rto = m_rttEstimator.getEstimatedRto();
+    segInfo.state = SegmentState::Retransmitted;
     m_nRetransmitted++;
   }
   else {
     m_highInterest = segNo;
-    Milliseconds rto = m_rttEstimator.getEstimatedRto();
-    SegmentInfo segInfo{interestId, SegmentState::FirstTimeSent, rto, time::steady_clock::now()};
-
-    m_segmentInfo.emplace(segNo, segInfo);
+    m_segmentInfo[segNo] = {interestId,
+                            time::steady_clock::now(),
+                            m_rttEstimator.getEstimatedRto(),
+                            SegmentState::FirstTimeSent};
   }
 }
 
@@ -229,10 +227,6 @@
   }
 
   uint64_t recvSegNo = getSegmentFromPacket(data);
-  if (m_highData < recvSegNo) {
-    m_highData = recvSegNo;
-  }
-
   SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
   if (segInfo.state == SegmentState::RetxReceived) {
     m_segmentInfo.erase(recvSegNo);
@@ -240,21 +234,24 @@
   }
 
   Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
-
   if (m_options.isVerbose) {
     std::cerr << "Received segment #" << recvSegNo
               << ", rtt=" << rtt.count() << "ms"
               << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
   }
 
-  // for segments in retransmission queue, no need to decrement m_nInFlight since
-  // it's already been decremented when segments timed out
-  if (segInfo.state != SegmentState::InRetxQueue && m_nInFlight > 0) {
+  if (m_highData < recvSegNo) {
+    m_highData = recvSegNo;
+  }
+
+  // for segments in retx queue, we must not decrement m_nInFlight
+  // because it was already decremented when the segment timed out
+  if (segInfo.state != SegmentState::InRetxQueue) {
     m_nInFlight--;
   }
 
-  m_receivedSize += data.getContent().value_size();
   m_nReceived++;
+  m_receivedSize += data.getContent().value_size();
 
   increaseWindow();
   onData(interest, data);
@@ -267,6 +264,7 @@
     m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
   }
   else { // retransmission
+    BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
     segInfo.state = SegmentState::RetxReceived;
   }
 
@@ -296,20 +294,19 @@
   uint64_t segNo = getSegmentFromPacket(interest);
 
   switch (nack.getReason()) {
-    case lp::NackReason::DUPLICATE: {
-      break; // ignore duplicates
-    }
-    case lp::NackReason::CONGESTION: { // treated the same as timeout for now
-      m_retxQueue.push(segNo); // put on retx queue
-      m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
-      handleTimeout(1);
+    case lp::NackReason::DUPLICATE:
+      // ignore duplicates
       break;
-    }
-    default: {
+    case lp::NackReason::CONGESTION:
+      // treated the same as timeout for now
+      enqueueForRetransmission(segNo);
+      recordTimeout();
+      schedulePackets();
+      break;
+    default:
       handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
                  ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
       break;
-    }
   }
 }
 
@@ -319,18 +316,14 @@
   if (isStopping())
     return;
 
-  uint64_t segNo = getSegmentFromPacket(interest);
-  m_retxQueue.push(segNo); // put on retx queue
-  m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
-  handleTimeout(1);
+  enqueueForRetransmission(getSegmentFromPacket(interest));
+  recordTimeout();
+  schedulePackets();
 }
 
 void
-PipelineInterestsAimd::handleTimeout(int timeoutCount)
+PipelineInterestsAimd::recordTimeout()
 {
-  if (timeoutCount <= 0)
-    return;
-
   if (m_options.disableCwa || m_highData > m_recPoint) {
     // react to only one timeout per RTT (conservative window adaptation)
     m_recPoint = m_highInterest;
@@ -344,9 +337,15 @@
                 << ", ssthresh = " << m_ssthresh << std::endl;
     }
   }
+}
 
-  m_nInFlight = std::max<int64_t>(0, m_nInFlight - timeoutCount);
-  schedulePackets();
+void
+PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
+{
+  BOOST_ASSERT(m_nInFlight > 0);
+  m_nInFlight--;
+  m_retxQueue.push(segNo);
+  m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
 }
 
 void
@@ -361,8 +360,7 @@
 
   if (!m_hasFinalBlockId) {
     m_segmentInfo.erase(segNo);
-    if (m_nInFlight > 0)
-      m_nInFlight--;
+    m_nInFlight--;
 
     if (m_segmentInfo.empty()) {
       onFailure("Fetching terminated but no final segment number has been found");
@@ -384,6 +382,7 @@
   } else {
     m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
   }
+
   afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
 }
 
@@ -393,6 +392,7 @@
   // please refer to RFC 5681, Section 3.1 for the rationale behind it
   m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
   m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
+
   afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
 }
 
@@ -406,15 +406,14 @@
 }
 
 void
-PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segmentNo)
+PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
 {
   for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
     // cancel fetching all segments that follow
-    if (it->first > segmentNo) {
+    if (it->first > segNo) {
       m_face.removePendingInterest(it->second.interestId);
       it = m_segmentInfo.erase(it);
-      if (m_nInFlight > 0)
-        m_nInFlight--;
+      m_nInFlight--;
     }
     else {
       ++it;
@@ -479,26 +478,21 @@
     os << "RetxReceived";
     break;
   }
-
   return os;
 }
 
 std::ostream&
 operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
 {
-  os << "PipelineInterestsAimd initial parameters:" << "\n"
+  os << "PipelineInterestsAimd initial parameters:\n"
      << "\tInitial congestion window size = " << options.initCwnd << "\n"
      << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
-     << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
      << "\tAdditive increase step = " << options.aiStep << "\n"
+     << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
      << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
-     << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n";
-
-  std::string cwaStatus = options.disableCwa ? "disabled" : "enabled";
-  os << "\tConservative Window Adaptation " << cwaStatus << "\n";
-
-  std::string cwndStatus = options.resetCwndToInit ? "initCwnd" : "ssthresh";
-  os << "\tResetting cwnd to " << cwndStatus << " when loss event occurs" << "\n";
+     << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
+     << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
+     << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
   return os;
 }
 
diff --git a/tools/chunks/catchunks/pipeline-interests-aimd.hpp b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
index b51c59f..ec5bfce 100644
--- a/tools/chunks/catchunks/pipeline-interests-aimd.hpp
+++ b/tools/chunks/catchunks/pipeline-interests-aimd.hpp
@@ -74,12 +74,10 @@
  */
 struct SegmentInfo
 {
-  const PendingInterestId* interestId; ///< The pending interest ID returned by
-                                       ///< ndn::Face::expressInterest. It can be used with
-                                       ///< removePendingInterest before retransmitting this Interest.
-  SegmentState state;
-  Milliseconds rto;
+  const PendingInterestId* interestId; ///< pending interest ID returned by ndn::Face::expressInterest
   time::steady_clock::TimePoint timeSent;
+  Milliseconds rto;
+  SegmentState state;
 };
 
 /**
@@ -161,7 +159,10 @@
   handleLifetimeExpiration(const Interest& interest);
 
   void
-  handleTimeout(int timeoutCount);
+  recordTimeout();
+
+  void
+  enqueueForRetransmission(uint64_t segNo);
 
   void
   handleFail(uint64_t segNo, const std::string& reason);
@@ -185,7 +186,7 @@
   getNextSegmentNo();
 
   void
-  cancelInFlightSegmentsGreaterThan(uint64_t segmentNo);
+  cancelInFlightSegmentsGreaterThan(uint64_t segNo);
 
   void
   printSummary() const;
@@ -212,14 +213,13 @@
   double m_cwnd; ///< current congestion window size (in segments)
   double m_ssthresh; ///< current slow start threshold
 
-  std::queue<uint64_t> m_retxQueue;
-
-  std::unordered_map<uint64_t, SegmentInfo> m_segmentInfo; ///< the map keeps all the internal information
-                                                           ///< of the sent but not ackownledged segments
-
-  std::unordered_map<uint64_t, int> m_retxCount; ///< maps segment number to its retransmission count.
+  std::unordered_map<uint64_t, SegmentInfo> m_segmentInfo; ///< keeps all the internal information
+                                                           ///< on sent but not acked segments
+  std::unordered_map<uint64_t, int> m_retxCount; ///< maps segment number to its retransmission count;
                                                  ///< if the count reaches to the maximum number of
                                                  ///< timeout/nack retries, the pipeline will be aborted
+  std::queue<uint64_t> m_retxQueue;
+
   bool m_hasFailure;
   uint64_t m_failedSegNo;
   std::string m_failureReason;