blob: b979a51a0b1e88dbb1b81fefd2114b1543068cc1 [file] [log] [blame]
Davide Pesaventobf1c0692017-01-15 19:15:09 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +00002/*
Davide Pesaventocd65c2c2017-01-15 16:10:38 -05003 * Copyright (c) 2016-2017, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Weiwei Liu245d7912016-07-28 00:04:25 -07006 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +000025 * @author Chavoosh Ghasemi
Weiwei Liu245d7912016-07-28 00:04:25 -070026 */
27
28#include "pipeline-interests-aimd.hpp"
29
30#include <cmath>
31
32namespace ndn {
33namespace chunks {
34namespace aimd {
35
36PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
37 const Options& options)
38 : PipelineInterests(face)
39 , m_options(options)
40 , m_rttEstimator(rttEstimator)
41 , m_scheduler(m_face.getIoService())
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050042 , m_checkRtoEvent(m_scheduler)
Weiwei Liu245d7912016-07-28 00:04:25 -070043 , m_highData(0)
44 , m_highInterest(0)
45 , m_recPoint(0)
46 , m_nInFlight(0)
Weiwei Liu245d7912016-07-28 00:04:25 -070047 , m_nLossEvents(0)
48 , m_nRetransmitted(0)
49 , m_cwnd(m_options.initCwnd)
50 , m_ssthresh(m_options.initSsthresh)
51 , m_hasFailure(false)
52 , m_failedSegNo(0)
53{
54 if (m_options.isVerbose) {
55 std::cerr << m_options;
56 }
57}
58
59PipelineInterestsAimd::~PipelineInterestsAimd()
60{
61 cancel();
62}
63
64void
65PipelineInterestsAimd::doRun()
66{
Weiwei Liu245d7912016-07-28 00:04:25 -070067 // schedule the event to check retransmission timer
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050068 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -070069
Davide Pesavento958896e2017-01-19 00:52:04 -050070 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -070071}
72
73void
74PipelineInterestsAimd::doCancel()
75{
76 for (const auto& entry : m_segmentInfo) {
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050077 m_face.removePendingInterest(entry.second.interestId);
Weiwei Liu245d7912016-07-28 00:04:25 -070078 }
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050079 m_checkRtoEvent.cancel();
Weiwei Liu245d7912016-07-28 00:04:25 -070080 m_segmentInfo.clear();
Weiwei Liu245d7912016-07-28 00:04:25 -070081}
82
83void
84PipelineInterestsAimd::checkRto()
85{
86 if (isStopping())
87 return;
88
Davide Pesavento958896e2017-01-19 00:52:04 -050089 bool hasTimeout = false;
Weiwei Liu245d7912016-07-28 00:04:25 -070090
91 for (auto& entry : m_segmentInfo) {
92 SegmentInfo& segInfo = entry.second;
93 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
94 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
95 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
96 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
Davide Pesavento958896e2017-01-19 00:52:04 -050097 hasTimeout = true;
98 enqueueForRetransmission(entry.first);
Weiwei Liu245d7912016-07-28 00:04:25 -070099 }
100 }
101 }
102
Davide Pesavento958896e2017-01-19 00:52:04 -0500103 if (hasTimeout) {
104 recordTimeout();
105 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700106 }
107
108 // schedule the next check after predefined interval
Davide Pesaventocd65c2c2017-01-15 16:10:38 -0500109 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -0700110}
111
112void
113PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
114{
115 if (isStopping())
116 return;
117
118 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
119 return;
120
121 if (!isRetransmission && m_hasFailure)
122 return;
123
124 if (m_options.isVerbose) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500125 std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
126 << " segment #" << segNo << std::endl;
Weiwei Liu245d7912016-07-28 00:04:25 -0700127 }
128
129 if (isRetransmission) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500130 // keep track of retx count for this segment
131 auto ret = m_retxCount.emplace(segNo, 1);
Weiwei Liu245d7912016-07-28 00:04:25 -0700132 if (ret.second == false) { // not the first retransmission
133 m_retxCount[segNo] += 1;
134 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
135 return handleFail(segNo, "Reached the maximum number of retries (" +
136 to_string(m_options.maxRetriesOnTimeoutOrNack) +
137 ") while retrieving segment #" + to_string(segNo));
138 }
139
140 if (m_options.isVerbose) {
141 std::cerr << "# of retries for segment #" << segNo
142 << " is " << m_retxCount[segNo] << std::endl;
143 }
144 }
145
146 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
147 }
148
149 Interest interest(Name(m_prefix).appendSegment(segNo));
150 interest.setInterestLifetime(m_options.interestLifetime);
151 interest.setMustBeFresh(m_options.mustBeFresh);
152 interest.setMaxSuffixComponents(1);
153
154 auto interestId = m_face.expressInterest(interest,
155 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
156 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
Davide Pesaventoe9c69852017-11-04 18:08:37 -0400157 bind(&PipelineInterestsAimd::handleLifetimeExpiration, this, _1));
Weiwei Liu245d7912016-07-28 00:04:25 -0700158 m_nInFlight++;
159
160 if (isRetransmission) {
161 SegmentInfo& segInfo = m_segmentInfo[segNo];
Weiwei Liu245d7912016-07-28 00:04:25 -0700162 segInfo.timeSent = time::steady_clock::now();
Davide Pesavento958896e2017-01-19 00:52:04 -0500163 segInfo.rto = m_rttEstimator.getEstimatedRto();
164 segInfo.state = SegmentState::Retransmitted;
Weiwei Liu245d7912016-07-28 00:04:25 -0700165 m_nRetransmitted++;
166 }
167 else {
168 m_highInterest = segNo;
Davide Pesavento958896e2017-01-19 00:52:04 -0500169 m_segmentInfo[segNo] = {interestId,
170 time::steady_clock::now(),
171 m_rttEstimator.getEstimatedRto(),
172 SegmentState::FirstTimeSent};
Weiwei Liu245d7912016-07-28 00:04:25 -0700173 }
174}
175
176void
177PipelineInterestsAimd::schedulePackets()
178{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500179 BOOST_ASSERT(m_nInFlight >= 0);
180 auto availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nInFlight;
181
Weiwei Liu245d7912016-07-28 00:04:25 -0700182 while (availableWindowSize > 0) {
183 if (!m_retxQueue.empty()) { // do retransmission first
184 uint64_t retxSegNo = m_retxQueue.front();
185 m_retxQueue.pop();
186
187 auto it = m_segmentInfo.find(retxSegNo);
188 if (it == m_segmentInfo.end()) {
189 continue;
190 }
191 // the segment is still in the map, it means that it needs to be retransmitted
192 sendInterest(retxSegNo, true);
193 }
194 else { // send next segment
195 sendInterest(getNextSegmentNo(), false);
196 }
197 availableWindowSize--;
198 }
199}
200
201void
202PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
203{
204 if (isStopping())
205 return;
206
207 // Data name will not have extra components because MaxSuffixComponents is set to 1
208 BOOST_ASSERT(data.getName().equals(interest.getName()));
209
210 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
211 m_lastSegmentNo = data.getFinalBlockId().toSegment();
212 m_hasFinalBlockId = true;
213 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
214 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
215 // previously failed segment is part of the content
216 return onFailure(m_failureReason);
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000217 }
218 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700219 m_hasFailure = false;
220 }
221 }
222
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500223 uint64_t recvSegNo = getSegmentFromPacket(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700224 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
225 if (segInfo.state == SegmentState::RetxReceived) {
226 m_segmentInfo.erase(recvSegNo);
227 return; // ignore already-received segment
228 }
229
230 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
Weiwei Liu245d7912016-07-28 00:04:25 -0700231 if (m_options.isVerbose) {
232 std::cerr << "Received segment #" << recvSegNo
233 << ", rtt=" << rtt.count() << "ms"
234 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
235 }
236
Davide Pesavento958896e2017-01-19 00:52:04 -0500237 if (m_highData < recvSegNo) {
238 m_highData = recvSegNo;
239 }
240
241 // for segments in retx queue, we must not decrement m_nInFlight
242 // because it was already decremented when the segment timed out
243 if (segInfo.state != SegmentState::InRetxQueue) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700244 m_nInFlight--;
245 }
246
Weiwei Liu245d7912016-07-28 00:04:25 -0700247 increaseWindow();
Davide Pesaventoe9c69852017-11-04 18:08:37 -0400248 onData(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700249
250 if (segInfo.state == SegmentState::FirstTimeSent ||
251 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500252 auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
253 BOOST_ASSERT(nExpectedSamples > 0);
254 m_rttEstimator.addMeasurement(recvSegNo, rtt, static_cast<size_t>(nExpectedSamples));
Weiwei Liu245d7912016-07-28 00:04:25 -0700255 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
256 }
257 else { // retransmission
Davide Pesavento958896e2017-01-19 00:52:04 -0500258 BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
Weiwei Liu245d7912016-07-28 00:04:25 -0700259 segInfo.state = SegmentState::RetxReceived;
260 }
261
262 BOOST_ASSERT(m_nReceived > 0);
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500263 if (m_hasFinalBlockId &&
264 static_cast<uint64_t>(m_nReceived - 1) >= m_lastSegmentNo) { // all segments have been received
Weiwei Liu245d7912016-07-28 00:04:25 -0700265 cancel();
266 if (m_options.isVerbose) {
267 printSummary();
268 }
269 }
270 else {
271 schedulePackets();
272 }
273}
274
275void
276PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
277{
278 if (isStopping())
279 return;
280
281 if (m_options.isVerbose)
282 std::cerr << "Received Nack with reason " << nack.getReason()
283 << " for Interest " << interest << std::endl;
284
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500285 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700286
287 switch (nack.getReason()) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500288 case lp::NackReason::DUPLICATE:
289 // ignore duplicates
Weiwei Liu245d7912016-07-28 00:04:25 -0700290 break;
Davide Pesavento958896e2017-01-19 00:52:04 -0500291 case lp::NackReason::CONGESTION:
292 // treated the same as timeout for now
293 enqueueForRetransmission(segNo);
294 recordTimeout();
295 schedulePackets();
296 break;
297 default:
Weiwei Liu245d7912016-07-28 00:04:25 -0700298 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
299 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
300 break;
Weiwei Liu245d7912016-07-28 00:04:25 -0700301 }
302}
303
304void
305PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
306{
307 if (isStopping())
308 return;
309
Davide Pesavento958896e2017-01-19 00:52:04 -0500310 enqueueForRetransmission(getSegmentFromPacket(interest));
311 recordTimeout();
312 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700313}
314
315void
Davide Pesavento958896e2017-01-19 00:52:04 -0500316PipelineInterestsAimd::recordTimeout()
Weiwei Liu245d7912016-07-28 00:04:25 -0700317{
Weiwei Liu245d7912016-07-28 00:04:25 -0700318 if (m_options.disableCwa || m_highData > m_recPoint) {
319 // react to only one timeout per RTT (conservative window adaptation)
320 m_recPoint = m_highInterest;
321
322 decreaseWindow();
323 m_rttEstimator.backoffRto();
324 m_nLossEvents++;
325
326 if (m_options.isVerbose) {
327 std::cerr << "Packet loss event, cwnd = " << m_cwnd
328 << ", ssthresh = " << m_ssthresh << std::endl;
329 }
330 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500331}
Weiwei Liu245d7912016-07-28 00:04:25 -0700332
Davide Pesavento958896e2017-01-19 00:52:04 -0500333void
334PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
335{
336 BOOST_ASSERT(m_nInFlight > 0);
337 m_nInFlight--;
338 m_retxQueue.push(segNo);
339 m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
Weiwei Liu245d7912016-07-28 00:04:25 -0700340}
341
342void
343PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
344{
345 if (isStopping())
346 return;
347
348 // if the failed segment is definitely part of the content, raise a fatal error
349 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
350 return onFailure(reason);
351
352 if (!m_hasFinalBlockId) {
353 m_segmentInfo.erase(segNo);
Davide Pesavento958896e2017-01-19 00:52:04 -0500354 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700355
356 if (m_segmentInfo.empty()) {
357 onFailure("Fetching terminated but no final segment number has been found");
358 }
359 else {
360 cancelInFlightSegmentsGreaterThan(segNo);
361 m_hasFailure = true;
362 m_failedSegNo = segNo;
363 m_failureReason = reason;
364 }
365 }
366}
367
368void
369PipelineInterestsAimd::increaseWindow()
370{
371 if (m_cwnd < m_ssthresh) {
372 m_cwnd += m_options.aiStep; // additive increase
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000373 }
374 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700375 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
376 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500377
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500378 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700379}
380
381void
382PipelineInterestsAimd::decreaseWindow()
383{
384 // please refer to RFC 5681, Section 3.1 for the rationale behind it
385 m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
386 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
Davide Pesavento958896e2017-01-19 00:52:04 -0500387
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500388 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700389}
390
Weiwei Liu245d7912016-07-28 00:04:25 -0700391void
Davide Pesavento958896e2017-01-19 00:52:04 -0500392PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
Weiwei Liu245d7912016-07-28 00:04:25 -0700393{
394 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
395 // cancel fetching all segments that follow
Davide Pesavento958896e2017-01-19 00:52:04 -0500396 if (it->first > segNo) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700397 m_face.removePendingInterest(it->second.interestId);
398 it = m_segmentInfo.erase(it);
Davide Pesavento958896e2017-01-19 00:52:04 -0500399 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700400 }
401 else {
402 ++it;
403 }
404 }
405}
406
407void
408PipelineInterestsAimd::printSummary() const
409{
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000410 PipelineInterests::printSummary();
411 std::cerr << "Total # of packet loss events: " << m_nLossEvents << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700412 << "Packet loss rate: "
413 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000414 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700415}
416
417std::ostream&
418operator<<(std::ostream& os, SegmentState state)
419{
420 switch (state) {
421 case SegmentState::FirstTimeSent:
422 os << "FirstTimeSent";
423 break;
424 case SegmentState::InRetxQueue:
425 os << "InRetxQueue";
426 break;
427 case SegmentState::Retransmitted:
428 os << "Retransmitted";
429 break;
430 case SegmentState::RetxReceived:
431 os << "RetxReceived";
432 break;
433 }
Weiwei Liu245d7912016-07-28 00:04:25 -0700434 return os;
435}
436
437std::ostream&
438operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
439{
Davide Pesavento958896e2017-01-19 00:52:04 -0500440 os << "PipelineInterestsAimd initial parameters:\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700441 << "\tInitial congestion window size = " << options.initCwnd << "\n"
442 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700443 << "\tAdditive increase step = " << options.aiStep << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500444 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700445 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500446 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
447 << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
448 << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700449 return os;
450}
451
452} // namespace aimd
453} // namespace chunks
454} // namespace ndn