blob: 61ed8f3f0d99dc810f0f99b52d7d34c8f17b219d [file] [log] [blame]
Davide Pesaventobf1c0692017-01-15 19:15:09 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +00002/*
Davide Pesaventocd65c2c2017-01-15 16:10:38 -05003 * Copyright (c) 2016-2017, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Weiwei Liu245d7912016-07-28 00:04:25 -07006 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +000025 * @author Chavoosh Ghasemi
Weiwei Liu245d7912016-07-28 00:04:25 -070026 */
27
28#include "pipeline-interests-aimd.hpp"
29
30#include <cmath>
31
32namespace ndn {
33namespace chunks {
34namespace aimd {
35
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +000036constexpr double PipelineInterestsAimd::MIN_SSTHRESH;
37
Weiwei Liu245d7912016-07-28 00:04:25 -070038PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
39 const Options& options)
40 : PipelineInterests(face)
41 , m_options(options)
42 , m_rttEstimator(rttEstimator)
43 , m_scheduler(m_face.getIoService())
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050044 , m_checkRtoEvent(m_scheduler)
Weiwei Liu245d7912016-07-28 00:04:25 -070045 , m_highData(0)
46 , m_highInterest(0)
47 , m_recPoint(0)
48 , m_nInFlight(0)
Weiwei Liu245d7912016-07-28 00:04:25 -070049 , m_nLossEvents(0)
50 , m_nRetransmitted(0)
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +000051 , m_nCongMarks(0)
Weiwei Liu245d7912016-07-28 00:04:25 -070052 , m_cwnd(m_options.initCwnd)
53 , m_ssthresh(m_options.initSsthresh)
54 , m_hasFailure(false)
55 , m_failedSegNo(0)
56{
57 if (m_options.isVerbose) {
58 std::cerr << m_options;
59 }
60}
61
62PipelineInterestsAimd::~PipelineInterestsAimd()
63{
64 cancel();
65}
66
67void
68PipelineInterestsAimd::doRun()
69{
Weiwei Liu245d7912016-07-28 00:04:25 -070070 // schedule the event to check retransmission timer
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050071 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -070072
Davide Pesavento958896e2017-01-19 00:52:04 -050073 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -070074}
75
76void
77PipelineInterestsAimd::doCancel()
78{
79 for (const auto& entry : m_segmentInfo) {
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050080 m_face.removePendingInterest(entry.second.interestId);
Weiwei Liu245d7912016-07-28 00:04:25 -070081 }
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050082 m_checkRtoEvent.cancel();
Weiwei Liu245d7912016-07-28 00:04:25 -070083 m_segmentInfo.clear();
Weiwei Liu245d7912016-07-28 00:04:25 -070084}
85
86void
87PipelineInterestsAimd::checkRto()
88{
89 if (isStopping())
90 return;
91
Davide Pesavento958896e2017-01-19 00:52:04 -050092 bool hasTimeout = false;
Weiwei Liu245d7912016-07-28 00:04:25 -070093
94 for (auto& entry : m_segmentInfo) {
95 SegmentInfo& segInfo = entry.second;
96 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
97 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
98 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
99 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
Davide Pesavento958896e2017-01-19 00:52:04 -0500100 hasTimeout = true;
101 enqueueForRetransmission(entry.first);
Weiwei Liu245d7912016-07-28 00:04:25 -0700102 }
103 }
104 }
105
Davide Pesavento958896e2017-01-19 00:52:04 -0500106 if (hasTimeout) {
107 recordTimeout();
108 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700109 }
110
111 // schedule the next check after predefined interval
Davide Pesaventocd65c2c2017-01-15 16:10:38 -0500112 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -0700113}
114
115void
116PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
117{
118 if (isStopping())
119 return;
120
121 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
122 return;
123
124 if (!isRetransmission && m_hasFailure)
125 return;
126
127 if (m_options.isVerbose) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500128 std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
129 << " segment #" << segNo << std::endl;
Weiwei Liu245d7912016-07-28 00:04:25 -0700130 }
131
132 if (isRetransmission) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500133 // keep track of retx count for this segment
134 auto ret = m_retxCount.emplace(segNo, 1);
Weiwei Liu245d7912016-07-28 00:04:25 -0700135 if (ret.second == false) { // not the first retransmission
136 m_retxCount[segNo] += 1;
137 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
138 return handleFail(segNo, "Reached the maximum number of retries (" +
139 to_string(m_options.maxRetriesOnTimeoutOrNack) +
140 ") while retrieving segment #" + to_string(segNo));
141 }
142
143 if (m_options.isVerbose) {
144 std::cerr << "# of retries for segment #" << segNo
145 << " is " << m_retxCount[segNo] << std::endl;
146 }
147 }
148
149 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
150 }
151
152 Interest interest(Name(m_prefix).appendSegment(segNo));
153 interest.setInterestLifetime(m_options.interestLifetime);
154 interest.setMustBeFresh(m_options.mustBeFresh);
155 interest.setMaxSuffixComponents(1);
156
157 auto interestId = m_face.expressInterest(interest,
158 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
159 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
Davide Pesaventoe9c69852017-11-04 18:08:37 -0400160 bind(&PipelineInterestsAimd::handleLifetimeExpiration, this, _1));
Weiwei Liu245d7912016-07-28 00:04:25 -0700161 m_nInFlight++;
162
163 if (isRetransmission) {
164 SegmentInfo& segInfo = m_segmentInfo[segNo];
Weiwei Liu245d7912016-07-28 00:04:25 -0700165 segInfo.timeSent = time::steady_clock::now();
Davide Pesavento958896e2017-01-19 00:52:04 -0500166 segInfo.rto = m_rttEstimator.getEstimatedRto();
167 segInfo.state = SegmentState::Retransmitted;
Weiwei Liu245d7912016-07-28 00:04:25 -0700168 m_nRetransmitted++;
169 }
170 else {
171 m_highInterest = segNo;
Davide Pesavento958896e2017-01-19 00:52:04 -0500172 m_segmentInfo[segNo] = {interestId,
173 time::steady_clock::now(),
174 m_rttEstimator.getEstimatedRto(),
175 SegmentState::FirstTimeSent};
Weiwei Liu245d7912016-07-28 00:04:25 -0700176 }
177}
178
179void
180PipelineInterestsAimd::schedulePackets()
181{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500182 BOOST_ASSERT(m_nInFlight >= 0);
183 auto availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nInFlight;
184
Weiwei Liu245d7912016-07-28 00:04:25 -0700185 while (availableWindowSize > 0) {
186 if (!m_retxQueue.empty()) { // do retransmission first
187 uint64_t retxSegNo = m_retxQueue.front();
188 m_retxQueue.pop();
189
190 auto it = m_segmentInfo.find(retxSegNo);
191 if (it == m_segmentInfo.end()) {
192 continue;
193 }
194 // the segment is still in the map, it means that it needs to be retransmitted
195 sendInterest(retxSegNo, true);
196 }
197 else { // send next segment
198 sendInterest(getNextSegmentNo(), false);
199 }
200 availableWindowSize--;
201 }
202}
203
204void
205PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
206{
207 if (isStopping())
208 return;
209
210 // Data name will not have extra components because MaxSuffixComponents is set to 1
211 BOOST_ASSERT(data.getName().equals(interest.getName()));
212
213 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
214 m_lastSegmentNo = data.getFinalBlockId().toSegment();
215 m_hasFinalBlockId = true;
216 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
217 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
218 // previously failed segment is part of the content
219 return onFailure(m_failureReason);
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000220 }
221 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700222 m_hasFailure = false;
223 }
224 }
225
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500226 uint64_t recvSegNo = getSegmentFromPacket(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700227 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
228 if (segInfo.state == SegmentState::RetxReceived) {
229 m_segmentInfo.erase(recvSegNo);
230 return; // ignore already-received segment
231 }
232
233 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
Weiwei Liu245d7912016-07-28 00:04:25 -0700234 if (m_options.isVerbose) {
235 std::cerr << "Received segment #" << recvSegNo
236 << ", rtt=" << rtt.count() << "ms"
237 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
238 }
239
Davide Pesavento958896e2017-01-19 00:52:04 -0500240 if (m_highData < recvSegNo) {
241 m_highData = recvSegNo;
242 }
243
244 // for segments in retx queue, we must not decrement m_nInFlight
245 // because it was already decremented when the segment timed out
246 if (segInfo.state != SegmentState::InRetxQueue) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700247 m_nInFlight--;
248 }
249
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +0000250 // upon finding congestion mark, decrease the window size
251 // without retransmitting any packet
252 if (data.getCongestionMark() > 0) {
253 m_nCongMarks++;
254 if (!m_options.ignoreCongMarks) {
255 if (m_options.disableCwa || m_highData > m_recPoint) {
256 m_recPoint = m_highInterest; // react to only one congestion event (timeout or congestion mark)
257 // per RTT (conservative window adaptation)
258 decreaseWindow();
259
260 if (m_options.isVerbose) {
261 std::cerr << "Received congestion mark, value = " << data.getCongestionMark()
262 << ", new cwnd = " << m_cwnd << std::endl;
263 }
264 }
265 }
266 else {
267 increaseWindow();
268 }
269 }
270 else {
271 increaseWindow();
272 }
273
Davide Pesaventoe9c69852017-11-04 18:08:37 -0400274 onData(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700275
276 if (segInfo.state == SegmentState::FirstTimeSent ||
277 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500278 auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
279 BOOST_ASSERT(nExpectedSamples > 0);
280 m_rttEstimator.addMeasurement(recvSegNo, rtt, static_cast<size_t>(nExpectedSamples));
Weiwei Liu245d7912016-07-28 00:04:25 -0700281 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
282 }
283 else { // retransmission
Davide Pesavento958896e2017-01-19 00:52:04 -0500284 BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
Weiwei Liu245d7912016-07-28 00:04:25 -0700285 segInfo.state = SegmentState::RetxReceived;
286 }
287
288 BOOST_ASSERT(m_nReceived > 0);
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500289 if (m_hasFinalBlockId &&
290 static_cast<uint64_t>(m_nReceived - 1) >= m_lastSegmentNo) { // all segments have been received
Weiwei Liu245d7912016-07-28 00:04:25 -0700291 cancel();
292 if (m_options.isVerbose) {
293 printSummary();
294 }
295 }
296 else {
297 schedulePackets();
298 }
299}
300
301void
302PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
303{
304 if (isStopping())
305 return;
306
307 if (m_options.isVerbose)
308 std::cerr << "Received Nack with reason " << nack.getReason()
309 << " for Interest " << interest << std::endl;
310
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500311 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700312
313 switch (nack.getReason()) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500314 case lp::NackReason::DUPLICATE:
315 // ignore duplicates
Weiwei Liu245d7912016-07-28 00:04:25 -0700316 break;
Davide Pesavento958896e2017-01-19 00:52:04 -0500317 case lp::NackReason::CONGESTION:
318 // treated the same as timeout for now
319 enqueueForRetransmission(segNo);
320 recordTimeout();
321 schedulePackets();
322 break;
323 default:
Weiwei Liu245d7912016-07-28 00:04:25 -0700324 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
325 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
326 break;
Weiwei Liu245d7912016-07-28 00:04:25 -0700327 }
328}
329
330void
331PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
332{
333 if (isStopping())
334 return;
335
Davide Pesavento958896e2017-01-19 00:52:04 -0500336 enqueueForRetransmission(getSegmentFromPacket(interest));
337 recordTimeout();
338 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700339}
340
341void
Davide Pesavento958896e2017-01-19 00:52:04 -0500342PipelineInterestsAimd::recordTimeout()
Weiwei Liu245d7912016-07-28 00:04:25 -0700343{
Weiwei Liu245d7912016-07-28 00:04:25 -0700344 if (m_options.disableCwa || m_highData > m_recPoint) {
345 // react to only one timeout per RTT (conservative window adaptation)
346 m_recPoint = m_highInterest;
347
348 decreaseWindow();
349 m_rttEstimator.backoffRto();
350 m_nLossEvents++;
351
352 if (m_options.isVerbose) {
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +0000353 std::cerr << "Packet loss event, new cwnd = " << m_cwnd
Weiwei Liu245d7912016-07-28 00:04:25 -0700354 << ", ssthresh = " << m_ssthresh << std::endl;
355 }
356 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500357}
Weiwei Liu245d7912016-07-28 00:04:25 -0700358
Davide Pesavento958896e2017-01-19 00:52:04 -0500359void
360PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
361{
362 BOOST_ASSERT(m_nInFlight > 0);
363 m_nInFlight--;
364 m_retxQueue.push(segNo);
365 m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
Weiwei Liu245d7912016-07-28 00:04:25 -0700366}
367
368void
369PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
370{
371 if (isStopping())
372 return;
373
374 // if the failed segment is definitely part of the content, raise a fatal error
375 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
376 return onFailure(reason);
377
378 if (!m_hasFinalBlockId) {
379 m_segmentInfo.erase(segNo);
Davide Pesavento958896e2017-01-19 00:52:04 -0500380 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700381
382 if (m_segmentInfo.empty()) {
383 onFailure("Fetching terminated but no final segment number has been found");
384 }
385 else {
386 cancelInFlightSegmentsGreaterThan(segNo);
387 m_hasFailure = true;
388 m_failedSegNo = segNo;
389 m_failureReason = reason;
390 }
391 }
392}
393
394void
395PipelineInterestsAimd::increaseWindow()
396{
397 if (m_cwnd < m_ssthresh) {
398 m_cwnd += m_options.aiStep; // additive increase
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000399 }
400 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700401 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
402 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500403
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500404 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700405}
406
407void
408PipelineInterestsAimd::decreaseWindow()
409{
410 // please refer to RFC 5681, Section 3.1 for the rationale behind it
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +0000411 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
Weiwei Liu245d7912016-07-28 00:04:25 -0700412 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
Davide Pesavento958896e2017-01-19 00:52:04 -0500413
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500414 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700415}
416
Weiwei Liu245d7912016-07-28 00:04:25 -0700417void
Davide Pesavento958896e2017-01-19 00:52:04 -0500418PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
Weiwei Liu245d7912016-07-28 00:04:25 -0700419{
420 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
421 // cancel fetching all segments that follow
Davide Pesavento958896e2017-01-19 00:52:04 -0500422 if (it->first > segNo) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700423 m_face.removePendingInterest(it->second.interestId);
424 it = m_segmentInfo.erase(it);
Davide Pesavento958896e2017-01-19 00:52:04 -0500425 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700426 }
427 else {
428 ++it;
429 }
430 }
431}
432
433void
434PipelineInterestsAimd::printSummary() const
435{
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000436 PipelineInterests::printSummary();
437 std::cerr << "Total # of packet loss events: " << m_nLossEvents << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700438 << "Packet loss rate: "
439 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +0000440 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
441 << "Total # of received congestion marks: " << m_nCongMarks << "\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700442}
443
444std::ostream&
445operator<<(std::ostream& os, SegmentState state)
446{
447 switch (state) {
448 case SegmentState::FirstTimeSent:
449 os << "FirstTimeSent";
450 break;
451 case SegmentState::InRetxQueue:
452 os << "InRetxQueue";
453 break;
454 case SegmentState::Retransmitted:
455 os << "Retransmitted";
456 break;
457 case SegmentState::RetxReceived:
458 os << "RetxReceived";
459 break;
460 }
Weiwei Liu245d7912016-07-28 00:04:25 -0700461 return os;
462}
463
464std::ostream&
465operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
466{
Davide Pesavento958896e2017-01-19 00:52:04 -0500467 os << "PipelineInterestsAimd initial parameters:\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700468 << "\tInitial congestion window size = " << options.initCwnd << "\n"
469 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700470 << "\tAdditive increase step = " << options.aiStep << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500471 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700472 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500473 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
Chavoosh Ghasemi641f5932017-11-06 22:45:11 +0000474 << "\tReaction to congestion marks " << (options.ignoreCongMarks ? "disabled" : "enabled") << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500475 << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
476 << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700477 return os;
478}
479
480} // namespace aimd
481} // namespace chunks
482} // namespace ndn