blob: a20988be0c416c2ad936218534a71574cd03c831 [file] [log] [blame]
Davide Pesaventobf1c0692017-01-15 19:15:09 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Weiwei Liu245d7912016-07-28 00:04:25 -07002/**
Davide Pesaventocd65c2c2017-01-15 16:10:38 -05003 * Copyright (c) 2016-2017, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Weiwei Liu245d7912016-07-28 00:04:25 -07006 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
25 */
26
27#include "pipeline-interests-aimd.hpp"
28
29#include <cmath>
30
31namespace ndn {
32namespace chunks {
33namespace aimd {
34
35PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
36 const Options& options)
37 : PipelineInterests(face)
38 , m_options(options)
39 , m_rttEstimator(rttEstimator)
40 , m_scheduler(m_face.getIoService())
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050041 , m_checkRtoEvent(m_scheduler)
Weiwei Liu245d7912016-07-28 00:04:25 -070042 , m_nextSegmentNo(0)
43 , m_receivedSize(0)
44 , m_highData(0)
45 , m_highInterest(0)
46 , m_recPoint(0)
47 , m_nInFlight(0)
48 , m_nReceived(0)
49 , m_nLossEvents(0)
50 , m_nRetransmitted(0)
51 , m_cwnd(m_options.initCwnd)
52 , m_ssthresh(m_options.initSsthresh)
53 , m_hasFailure(false)
54 , m_failedSegNo(0)
55{
56 if (m_options.isVerbose) {
57 std::cerr << m_options;
58 }
59}
60
61PipelineInterestsAimd::~PipelineInterestsAimd()
62{
63 cancel();
64}
65
66void
67PipelineInterestsAimd::doRun()
68{
Weiwei Liu245d7912016-07-28 00:04:25 -070069 // count the excluded segment
70 m_nReceived++;
71
72 // schedule the event to check retransmission timer
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050073 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -070074
Davide Pesavento958896e2017-01-19 00:52:04 -050075 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -070076}
77
78void
79PipelineInterestsAimd::doCancel()
80{
81 for (const auto& entry : m_segmentInfo) {
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050082 m_face.removePendingInterest(entry.second.interestId);
Weiwei Liu245d7912016-07-28 00:04:25 -070083 }
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050084 m_checkRtoEvent.cancel();
Weiwei Liu245d7912016-07-28 00:04:25 -070085 m_segmentInfo.clear();
Weiwei Liu245d7912016-07-28 00:04:25 -070086}
87
88void
89PipelineInterestsAimd::checkRto()
90{
91 if (isStopping())
92 return;
93
Davide Pesavento958896e2017-01-19 00:52:04 -050094 bool hasTimeout = false;
Weiwei Liu245d7912016-07-28 00:04:25 -070095
96 for (auto& entry : m_segmentInfo) {
97 SegmentInfo& segInfo = entry.second;
98 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
99 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
100 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
101 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
Davide Pesavento958896e2017-01-19 00:52:04 -0500102 hasTimeout = true;
103 enqueueForRetransmission(entry.first);
Weiwei Liu245d7912016-07-28 00:04:25 -0700104 }
105 }
106 }
107
Davide Pesavento958896e2017-01-19 00:52:04 -0500108 if (hasTimeout) {
109 recordTimeout();
110 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700111 }
112
113 // schedule the next check after predefined interval
Davide Pesaventocd65c2c2017-01-15 16:10:38 -0500114 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -0700115}
116
117void
118PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
119{
120 if (isStopping())
121 return;
122
123 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
124 return;
125
126 if (!isRetransmission && m_hasFailure)
127 return;
128
129 if (m_options.isVerbose) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500130 std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
131 << " segment #" << segNo << std::endl;
Weiwei Liu245d7912016-07-28 00:04:25 -0700132 }
133
134 if (isRetransmission) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500135 // keep track of retx count for this segment
136 auto ret = m_retxCount.emplace(segNo, 1);
Weiwei Liu245d7912016-07-28 00:04:25 -0700137 if (ret.second == false) { // not the first retransmission
138 m_retxCount[segNo] += 1;
139 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
140 return handleFail(segNo, "Reached the maximum number of retries (" +
141 to_string(m_options.maxRetriesOnTimeoutOrNack) +
142 ") while retrieving segment #" + to_string(segNo));
143 }
144
145 if (m_options.isVerbose) {
146 std::cerr << "# of retries for segment #" << segNo
147 << " is " << m_retxCount[segNo] << std::endl;
148 }
149 }
150
151 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
152 }
153
154 Interest interest(Name(m_prefix).appendSegment(segNo));
155 interest.setInterestLifetime(m_options.interestLifetime);
156 interest.setMustBeFresh(m_options.mustBeFresh);
157 interest.setMaxSuffixComponents(1);
158
159 auto interestId = m_face.expressInterest(interest,
160 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
161 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
162 bind(&PipelineInterestsAimd::handleLifetimeExpiration,
163 this, _1));
164
165 m_nInFlight++;
166
167 if (isRetransmission) {
168 SegmentInfo& segInfo = m_segmentInfo[segNo];
Weiwei Liu245d7912016-07-28 00:04:25 -0700169 segInfo.timeSent = time::steady_clock::now();
Davide Pesavento958896e2017-01-19 00:52:04 -0500170 segInfo.rto = m_rttEstimator.getEstimatedRto();
171 segInfo.state = SegmentState::Retransmitted;
Weiwei Liu245d7912016-07-28 00:04:25 -0700172 m_nRetransmitted++;
173 }
174 else {
175 m_highInterest = segNo;
Davide Pesavento958896e2017-01-19 00:52:04 -0500176 m_segmentInfo[segNo] = {interestId,
177 time::steady_clock::now(),
178 m_rttEstimator.getEstimatedRto(),
179 SegmentState::FirstTimeSent};
Weiwei Liu245d7912016-07-28 00:04:25 -0700180 }
181}
182
183void
184PipelineInterestsAimd::schedulePackets()
185{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500186 BOOST_ASSERT(m_nInFlight >= 0);
187 auto availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nInFlight;
188
Weiwei Liu245d7912016-07-28 00:04:25 -0700189 while (availableWindowSize > 0) {
190 if (!m_retxQueue.empty()) { // do retransmission first
191 uint64_t retxSegNo = m_retxQueue.front();
192 m_retxQueue.pop();
193
194 auto it = m_segmentInfo.find(retxSegNo);
195 if (it == m_segmentInfo.end()) {
196 continue;
197 }
198 // the segment is still in the map, it means that it needs to be retransmitted
199 sendInterest(retxSegNo, true);
200 }
201 else { // send next segment
202 sendInterest(getNextSegmentNo(), false);
203 }
204 availableWindowSize--;
205 }
206}
207
208void
209PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
210{
211 if (isStopping())
212 return;
213
214 // Data name will not have extra components because MaxSuffixComponents is set to 1
215 BOOST_ASSERT(data.getName().equals(interest.getName()));
216
217 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
218 m_lastSegmentNo = data.getFinalBlockId().toSegment();
219 m_hasFinalBlockId = true;
220 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
221 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
222 // previously failed segment is part of the content
223 return onFailure(m_failureReason);
224 } else {
225 m_hasFailure = false;
226 }
227 }
228
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500229 uint64_t recvSegNo = getSegmentFromPacket(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700230 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
231 if (segInfo.state == SegmentState::RetxReceived) {
232 m_segmentInfo.erase(recvSegNo);
233 return; // ignore already-received segment
234 }
235
236 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
Weiwei Liu245d7912016-07-28 00:04:25 -0700237 if (m_options.isVerbose) {
238 std::cerr << "Received segment #" << recvSegNo
239 << ", rtt=" << rtt.count() << "ms"
240 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
241 }
242
Davide Pesavento958896e2017-01-19 00:52:04 -0500243 if (m_highData < recvSegNo) {
244 m_highData = recvSegNo;
245 }
246
247 // for segments in retx queue, we must not decrement m_nInFlight
248 // because it was already decremented when the segment timed out
249 if (segInfo.state != SegmentState::InRetxQueue) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700250 m_nInFlight--;
251 }
252
Weiwei Liu245d7912016-07-28 00:04:25 -0700253 m_nReceived++;
Davide Pesavento958896e2017-01-19 00:52:04 -0500254 m_receivedSize += data.getContent().value_size();
Weiwei Liu245d7912016-07-28 00:04:25 -0700255
256 increaseWindow();
257 onData(interest, data);
258
259 if (segInfo.state == SegmentState::FirstTimeSent ||
260 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500261 auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
262 BOOST_ASSERT(nExpectedSamples > 0);
263 m_rttEstimator.addMeasurement(recvSegNo, rtt, static_cast<size_t>(nExpectedSamples));
Weiwei Liu245d7912016-07-28 00:04:25 -0700264 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
265 }
266 else { // retransmission
Davide Pesavento958896e2017-01-19 00:52:04 -0500267 BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
Weiwei Liu245d7912016-07-28 00:04:25 -0700268 segInfo.state = SegmentState::RetxReceived;
269 }
270
271 BOOST_ASSERT(m_nReceived > 0);
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500272 if (m_hasFinalBlockId &&
273 static_cast<uint64_t>(m_nReceived - 1) >= m_lastSegmentNo) { // all segments have been received
Weiwei Liu245d7912016-07-28 00:04:25 -0700274 cancel();
275 if (m_options.isVerbose) {
276 printSummary();
277 }
278 }
279 else {
280 schedulePackets();
281 }
282}
283
284void
285PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
286{
287 if (isStopping())
288 return;
289
290 if (m_options.isVerbose)
291 std::cerr << "Received Nack with reason " << nack.getReason()
292 << " for Interest " << interest << std::endl;
293
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500294 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700295
296 switch (nack.getReason()) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500297 case lp::NackReason::DUPLICATE:
298 // ignore duplicates
Weiwei Liu245d7912016-07-28 00:04:25 -0700299 break;
Davide Pesavento958896e2017-01-19 00:52:04 -0500300 case lp::NackReason::CONGESTION:
301 // treated the same as timeout for now
302 enqueueForRetransmission(segNo);
303 recordTimeout();
304 schedulePackets();
305 break;
306 default:
Weiwei Liu245d7912016-07-28 00:04:25 -0700307 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
308 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
309 break;
Weiwei Liu245d7912016-07-28 00:04:25 -0700310 }
311}
312
313void
314PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
315{
316 if (isStopping())
317 return;
318
Davide Pesavento958896e2017-01-19 00:52:04 -0500319 enqueueForRetransmission(getSegmentFromPacket(interest));
320 recordTimeout();
321 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700322}
323
324void
Davide Pesavento958896e2017-01-19 00:52:04 -0500325PipelineInterestsAimd::recordTimeout()
Weiwei Liu245d7912016-07-28 00:04:25 -0700326{
Weiwei Liu245d7912016-07-28 00:04:25 -0700327 if (m_options.disableCwa || m_highData > m_recPoint) {
328 // react to only one timeout per RTT (conservative window adaptation)
329 m_recPoint = m_highInterest;
330
331 decreaseWindow();
332 m_rttEstimator.backoffRto();
333 m_nLossEvents++;
334
335 if (m_options.isVerbose) {
336 std::cerr << "Packet loss event, cwnd = " << m_cwnd
337 << ", ssthresh = " << m_ssthresh << std::endl;
338 }
339 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500340}
Weiwei Liu245d7912016-07-28 00:04:25 -0700341
Davide Pesavento958896e2017-01-19 00:52:04 -0500342void
343PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
344{
345 BOOST_ASSERT(m_nInFlight > 0);
346 m_nInFlight--;
347 m_retxQueue.push(segNo);
348 m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
Weiwei Liu245d7912016-07-28 00:04:25 -0700349}
350
351void
352PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
353{
354 if (isStopping())
355 return;
356
357 // if the failed segment is definitely part of the content, raise a fatal error
358 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
359 return onFailure(reason);
360
361 if (!m_hasFinalBlockId) {
362 m_segmentInfo.erase(segNo);
Davide Pesavento958896e2017-01-19 00:52:04 -0500363 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700364
365 if (m_segmentInfo.empty()) {
366 onFailure("Fetching terminated but no final segment number has been found");
367 }
368 else {
369 cancelInFlightSegmentsGreaterThan(segNo);
370 m_hasFailure = true;
371 m_failedSegNo = segNo;
372 m_failureReason = reason;
373 }
374 }
375}
376
377void
378PipelineInterestsAimd::increaseWindow()
379{
380 if (m_cwnd < m_ssthresh) {
381 m_cwnd += m_options.aiStep; // additive increase
382 } else {
383 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
384 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500385
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500386 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700387}
388
389void
390PipelineInterestsAimd::decreaseWindow()
391{
392 // please refer to RFC 5681, Section 3.1 for the rationale behind it
393 m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
394 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
Davide Pesavento958896e2017-01-19 00:52:04 -0500395
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500396 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700397}
398
399uint64_t
400PipelineInterestsAimd::getNextSegmentNo()
401{
402 // get around the excluded segment
403 if (m_nextSegmentNo == m_excludedSegmentNo)
404 m_nextSegmentNo++;
405 return m_nextSegmentNo++;
406}
407
408void
Davide Pesavento958896e2017-01-19 00:52:04 -0500409PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
Weiwei Liu245d7912016-07-28 00:04:25 -0700410{
411 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
412 // cancel fetching all segments that follow
Davide Pesavento958896e2017-01-19 00:52:04 -0500413 if (it->first > segNo) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700414 m_face.removePendingInterest(it->second.interestId);
415 it = m_segmentInfo.erase(it);
Davide Pesavento958896e2017-01-19 00:52:04 -0500416 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700417 }
418 else {
419 ++it;
420 }
421 }
422}
423
424void
425PipelineInterestsAimd::printSummary() const
426{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500427 Milliseconds timeElapsed = time::steady_clock::now() - getStartTime();
428 double throughput = (8 * m_receivedSize * 1000) / timeElapsed.count();
Weiwei Liu245d7912016-07-28 00:04:25 -0700429
430 int pow = 0;
431 std::string throughputUnit;
432 while (throughput >= 1000.0 && pow < 4) {
433 throughput /= 1000.0;
434 pow++;
435 }
436 switch (pow) {
437 case 0:
438 throughputUnit = "bit/s";
439 break;
440 case 1:
441 throughputUnit = "kbit/s";
442 break;
443 case 2:
444 throughputUnit = "Mbit/s";
445 break;
446 case 3:
447 throughputUnit = "Gbit/s";
448 break;
449 case 4:
450 throughputUnit = "Tbit/s";
451 break;
452 }
453
454 std::cerr << "\nAll segments have been received.\n"
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500455 << "Time elapsed: " << timeElapsed << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700456 << "Total # of segments received: " << m_nReceived << "\n"
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500457 << "Total # of packet loss events: " << m_nLossEvents << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700458 << "Packet loss rate: "
459 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
460 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
461 << "Goodput: " << throughput << " " << throughputUnit << "\n";
462}
463
464std::ostream&
465operator<<(std::ostream& os, SegmentState state)
466{
467 switch (state) {
468 case SegmentState::FirstTimeSent:
469 os << "FirstTimeSent";
470 break;
471 case SegmentState::InRetxQueue:
472 os << "InRetxQueue";
473 break;
474 case SegmentState::Retransmitted:
475 os << "Retransmitted";
476 break;
477 case SegmentState::RetxReceived:
478 os << "RetxReceived";
479 break;
480 }
Weiwei Liu245d7912016-07-28 00:04:25 -0700481 return os;
482}
483
484std::ostream&
485operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
486{
Davide Pesavento958896e2017-01-19 00:52:04 -0500487 os << "PipelineInterestsAimd initial parameters:\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700488 << "\tInitial congestion window size = " << options.initCwnd << "\n"
489 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700490 << "\tAdditive increase step = " << options.aiStep << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500491 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700492 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500493 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
494 << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
495 << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700496 return os;
497}
498
499} // namespace aimd
500} // namespace chunks
501} // namespace ndn