blob: 622eb43a6f804f18f2303c58d696f0eef7eb79de [file] [log] [blame]
Davide Pesaventobf1c0692017-01-15 19:15:09 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Weiwei Liu245d7912016-07-28 00:04:25 -07002/**
Davide Pesaventocd65c2c2017-01-15 16:10:38 -05003 * Copyright (c) 2016-2017, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Weiwei Liu245d7912016-07-28 00:04:25 -07006 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
25 */
26
27#include "pipeline-interests-aimd.hpp"
28
29#include <cmath>
30
31namespace ndn {
32namespace chunks {
33namespace aimd {
34
35PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
36 const Options& options)
37 : PipelineInterests(face)
38 , m_options(options)
39 , m_rttEstimator(rttEstimator)
40 , m_scheduler(m_face.getIoService())
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050041 , m_checkRtoEvent(m_scheduler)
Weiwei Liu245d7912016-07-28 00:04:25 -070042 , m_nextSegmentNo(0)
43 , m_receivedSize(0)
44 , m_highData(0)
45 , m_highInterest(0)
46 , m_recPoint(0)
47 , m_nInFlight(0)
48 , m_nReceived(0)
49 , m_nLossEvents(0)
50 , m_nRetransmitted(0)
51 , m_cwnd(m_options.initCwnd)
52 , m_ssthresh(m_options.initSsthresh)
53 , m_hasFailure(false)
54 , m_failedSegNo(0)
55{
56 if (m_options.isVerbose) {
57 std::cerr << m_options;
58 }
59}
60
61PipelineInterestsAimd::~PipelineInterestsAimd()
62{
63 cancel();
64}
65
66void
67PipelineInterestsAimd::doRun()
68{
Weiwei Liu245d7912016-07-28 00:04:25 -070069 // count the excluded segment
70 m_nReceived++;
71
72 // schedule the event to check retransmission timer
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050073 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -070074
75 sendInterest(getNextSegmentNo(), false);
76}
77
78void
79PipelineInterestsAimd::doCancel()
80{
81 for (const auto& entry : m_segmentInfo) {
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050082 m_face.removePendingInterest(entry.second.interestId);
Weiwei Liu245d7912016-07-28 00:04:25 -070083 }
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050084 m_checkRtoEvent.cancel();
Weiwei Liu245d7912016-07-28 00:04:25 -070085 m_segmentInfo.clear();
Weiwei Liu245d7912016-07-28 00:04:25 -070086}
87
88void
89PipelineInterestsAimd::checkRto()
90{
91 if (isStopping())
92 return;
93
94 int timeoutCount = 0;
95
96 for (auto& entry : m_segmentInfo) {
97 SegmentInfo& segInfo = entry.second;
98 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
99 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
100 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
101 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
102 uint64_t timedoutSeg = entry.first;
103 m_retxQueue.push(timedoutSeg); // put on retx queue
104 segInfo.state = SegmentState::InRetxQueue; // update status
105 timeoutCount++;
106 }
107 }
108 }
109
110 if (timeoutCount > 0) {
111 handleTimeout(timeoutCount);
112 }
113
114 // schedule the next check after predefined interval
Davide Pesaventocd65c2c2017-01-15 16:10:38 -0500115 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -0700116}
117
118void
119PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
120{
121 if (isStopping())
122 return;
123
124 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
125 return;
126
127 if (!isRetransmission && m_hasFailure)
128 return;
129
130 if (m_options.isVerbose) {
131 if (isRetransmission)
132 std::cerr << "Retransmitting segment #" << segNo << std::endl;
133 else
134 std::cerr << "Requesting segment #" << segNo << std::endl;
135 }
136
137 if (isRetransmission) {
138 auto ret = m_retxCount.insert(std::make_pair(segNo, 1));
139 if (ret.second == false) { // not the first retransmission
140 m_retxCount[segNo] += 1;
141 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
142 return handleFail(segNo, "Reached the maximum number of retries (" +
143 to_string(m_options.maxRetriesOnTimeoutOrNack) +
144 ") while retrieving segment #" + to_string(segNo));
145 }
146
147 if (m_options.isVerbose) {
148 std::cerr << "# of retries for segment #" << segNo
149 << " is " << m_retxCount[segNo] << std::endl;
150 }
151 }
152
153 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
154 }
155
156 Interest interest(Name(m_prefix).appendSegment(segNo));
157 interest.setInterestLifetime(m_options.interestLifetime);
158 interest.setMustBeFresh(m_options.mustBeFresh);
159 interest.setMaxSuffixComponents(1);
160
161 auto interestId = m_face.expressInterest(interest,
162 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
163 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
164 bind(&PipelineInterestsAimd::handleLifetimeExpiration,
165 this, _1));
166
167 m_nInFlight++;
168
169 if (isRetransmission) {
170 SegmentInfo& segInfo = m_segmentInfo[segNo];
171 segInfo.state = SegmentState::Retransmitted;
172 segInfo.rto = m_rttEstimator.getEstimatedRto();
173 segInfo.timeSent = time::steady_clock::now();
174 m_nRetransmitted++;
175 }
176 else {
177 m_highInterest = segNo;
178 Milliseconds rto = m_rttEstimator.getEstimatedRto();
179 SegmentInfo segInfo{interestId, SegmentState::FirstTimeSent, rto, time::steady_clock::now()};
180
181 m_segmentInfo.emplace(segNo, segInfo);
182 }
183}
184
185void
186PipelineInterestsAimd::schedulePackets()
187{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500188 BOOST_ASSERT(m_nInFlight >= 0);
189 auto availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nInFlight;
190
Weiwei Liu245d7912016-07-28 00:04:25 -0700191 while (availableWindowSize > 0) {
192 if (!m_retxQueue.empty()) { // do retransmission first
193 uint64_t retxSegNo = m_retxQueue.front();
194 m_retxQueue.pop();
195
196 auto it = m_segmentInfo.find(retxSegNo);
197 if (it == m_segmentInfo.end()) {
198 continue;
199 }
200 // the segment is still in the map, it means that it needs to be retransmitted
201 sendInterest(retxSegNo, true);
202 }
203 else { // send next segment
204 sendInterest(getNextSegmentNo(), false);
205 }
206 availableWindowSize--;
207 }
208}
209
210void
211PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
212{
213 if (isStopping())
214 return;
215
216 // Data name will not have extra components because MaxSuffixComponents is set to 1
217 BOOST_ASSERT(data.getName().equals(interest.getName()));
218
219 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
220 m_lastSegmentNo = data.getFinalBlockId().toSegment();
221 m_hasFinalBlockId = true;
222 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
223 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
224 // previously failed segment is part of the content
225 return onFailure(m_failureReason);
226 } else {
227 m_hasFailure = false;
228 }
229 }
230
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500231 uint64_t recvSegNo = getSegmentFromPacket(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700232 if (m_highData < recvSegNo) {
233 m_highData = recvSegNo;
234 }
235
236 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
237 if (segInfo.state == SegmentState::RetxReceived) {
238 m_segmentInfo.erase(recvSegNo);
239 return; // ignore already-received segment
240 }
241
242 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
243
244 if (m_options.isVerbose) {
245 std::cerr << "Received segment #" << recvSegNo
246 << ", rtt=" << rtt.count() << "ms"
247 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
248 }
249
250 // for segments in retransmission queue, no need to decrement m_nInFlight since
251 // it's already been decremented when segments timed out
252 if (segInfo.state != SegmentState::InRetxQueue && m_nInFlight > 0) {
253 m_nInFlight--;
254 }
255
256 m_receivedSize += data.getContent().value_size();
257 m_nReceived++;
258
259 increaseWindow();
260 onData(interest, data);
261
262 if (segInfo.state == SegmentState::FirstTimeSent ||
263 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500264 auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
265 BOOST_ASSERT(nExpectedSamples > 0);
266 m_rttEstimator.addMeasurement(recvSegNo, rtt, static_cast<size_t>(nExpectedSamples));
Weiwei Liu245d7912016-07-28 00:04:25 -0700267 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
268 }
269 else { // retransmission
270 segInfo.state = SegmentState::RetxReceived;
271 }
272
273 BOOST_ASSERT(m_nReceived > 0);
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500274 if (m_hasFinalBlockId &&
275 static_cast<uint64_t>(m_nReceived - 1) >= m_lastSegmentNo) { // all segments have been received
Weiwei Liu245d7912016-07-28 00:04:25 -0700276 cancel();
277 if (m_options.isVerbose) {
278 printSummary();
279 }
280 }
281 else {
282 schedulePackets();
283 }
284}
285
286void
287PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
288{
289 if (isStopping())
290 return;
291
292 if (m_options.isVerbose)
293 std::cerr << "Received Nack with reason " << nack.getReason()
294 << " for Interest " << interest << std::endl;
295
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500296 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700297
298 switch (nack.getReason()) {
299 case lp::NackReason::DUPLICATE: {
300 break; // ignore duplicates
301 }
302 case lp::NackReason::CONGESTION: { // treated the same as timeout for now
303 m_retxQueue.push(segNo); // put on retx queue
304 m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
305 handleTimeout(1);
306 break;
307 }
308 default: {
309 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
310 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
311 break;
312 }
313 }
314}
315
316void
317PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
318{
319 if (isStopping())
320 return;
321
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500322 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700323 m_retxQueue.push(segNo); // put on retx queue
324 m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
325 handleTimeout(1);
326}
327
328void
329PipelineInterestsAimd::handleTimeout(int timeoutCount)
330{
331 if (timeoutCount <= 0)
332 return;
333
334 if (m_options.disableCwa || m_highData > m_recPoint) {
335 // react to only one timeout per RTT (conservative window adaptation)
336 m_recPoint = m_highInterest;
337
338 decreaseWindow();
339 m_rttEstimator.backoffRto();
340 m_nLossEvents++;
341
342 if (m_options.isVerbose) {
343 std::cerr << "Packet loss event, cwnd = " << m_cwnd
344 << ", ssthresh = " << m_ssthresh << std::endl;
345 }
346 }
347
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500348 m_nInFlight = std::max<int64_t>(0, m_nInFlight - timeoutCount);
Weiwei Liu245d7912016-07-28 00:04:25 -0700349 schedulePackets();
350}
351
352void
353PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
354{
355 if (isStopping())
356 return;
357
358 // if the failed segment is definitely part of the content, raise a fatal error
359 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
360 return onFailure(reason);
361
362 if (!m_hasFinalBlockId) {
363 m_segmentInfo.erase(segNo);
364 if (m_nInFlight > 0)
365 m_nInFlight--;
366
367 if (m_segmentInfo.empty()) {
368 onFailure("Fetching terminated but no final segment number has been found");
369 }
370 else {
371 cancelInFlightSegmentsGreaterThan(segNo);
372 m_hasFailure = true;
373 m_failedSegNo = segNo;
374 m_failureReason = reason;
375 }
376 }
377}
378
379void
380PipelineInterestsAimd::increaseWindow()
381{
382 if (m_cwnd < m_ssthresh) {
383 m_cwnd += m_options.aiStep; // additive increase
384 } else {
385 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
386 }
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500387 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700388}
389
390void
391PipelineInterestsAimd::decreaseWindow()
392{
393 // please refer to RFC 5681, Section 3.1 for the rationale behind it
394 m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
395 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500396 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700397}
398
399uint64_t
400PipelineInterestsAimd::getNextSegmentNo()
401{
402 // get around the excluded segment
403 if (m_nextSegmentNo == m_excludedSegmentNo)
404 m_nextSegmentNo++;
405 return m_nextSegmentNo++;
406}
407
408void
409PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segmentNo)
410{
411 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
412 // cancel fetching all segments that follow
413 if (it->first > segmentNo) {
414 m_face.removePendingInterest(it->second.interestId);
415 it = m_segmentInfo.erase(it);
416 if (m_nInFlight > 0)
417 m_nInFlight--;
418 }
419 else {
420 ++it;
421 }
422 }
423}
424
425void
426PipelineInterestsAimd::printSummary() const
427{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500428 Milliseconds timeElapsed = time::steady_clock::now() - getStartTime();
429 double throughput = (8 * m_receivedSize * 1000) / timeElapsed.count();
Weiwei Liu245d7912016-07-28 00:04:25 -0700430
431 int pow = 0;
432 std::string throughputUnit;
433 while (throughput >= 1000.0 && pow < 4) {
434 throughput /= 1000.0;
435 pow++;
436 }
437 switch (pow) {
438 case 0:
439 throughputUnit = "bit/s";
440 break;
441 case 1:
442 throughputUnit = "kbit/s";
443 break;
444 case 2:
445 throughputUnit = "Mbit/s";
446 break;
447 case 3:
448 throughputUnit = "Gbit/s";
449 break;
450 case 4:
451 throughputUnit = "Tbit/s";
452 break;
453 }
454
455 std::cerr << "\nAll segments have been received.\n"
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500456 << "Time elapsed: " << timeElapsed << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700457 << "Total # of segments received: " << m_nReceived << "\n"
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500458 << "Total # of packet loss events: " << m_nLossEvents << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700459 << "Packet loss rate: "
460 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
461 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
462 << "Goodput: " << throughput << " " << throughputUnit << "\n";
463}
464
465std::ostream&
466operator<<(std::ostream& os, SegmentState state)
467{
468 switch (state) {
469 case SegmentState::FirstTimeSent:
470 os << "FirstTimeSent";
471 break;
472 case SegmentState::InRetxQueue:
473 os << "InRetxQueue";
474 break;
475 case SegmentState::Retransmitted:
476 os << "Retransmitted";
477 break;
478 case SegmentState::RetxReceived:
479 os << "RetxReceived";
480 break;
481 }
482
483 return os;
484}
485
486std::ostream&
487operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
488{
489 os << "PipelineInterestsAimd initial parameters:" << "\n"
490 << "\tInitial congestion window size = " << options.initCwnd << "\n"
491 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
492 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
493 << "\tAdditive increase step = " << options.aiStep << "\n"
494 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
495 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n";
496
497 std::string cwaStatus = options.disableCwa ? "disabled" : "enabled";
498 os << "\tConservative Window Adaptation " << cwaStatus << "\n";
499
500 std::string cwndStatus = options.resetCwndToInit ? "initCwnd" : "ssthresh";
501 os << "\tResetting cwnd to " << cwndStatus << " when loss event occurs" << "\n";
502 return os;
503}
504
505} // namespace aimd
506} // namespace chunks
507} // namespace ndn