blob: cbff2356f55627ccaf355c300ff87d0da83f76e9 [file] [log] [blame]
Weiwei Liu245d7912016-07-28 00:04:25 -07001/**
2 * Copyright (c) 2016, Regents of the University of California,
3 * Colorado State University,
4 * University Pierre & Marie Curie, Sorbonne University.
5 *
6 * This file is part of ndn-tools (Named Data Networking Essential Tools).
7 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
8 *
9 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
10 * of the GNU General Public License as published by the Free Software Foundation,
11 * either version 3 of the License, or (at your option) any later version.
12 *
13 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
14 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
15 * PURPOSE. See the GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License along with
18 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
19 *
20 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
21 *
22 * @author Shuo Yang
23 * @author Weiwei Liu
24 */
25
26#include "pipeline-interests-aimd.hpp"
27
28#include <cmath>
29
30namespace ndn {
31namespace chunks {
32namespace aimd {
33
34PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
35 const Options& options)
36 : PipelineInterests(face)
37 , m_options(options)
38 , m_rttEstimator(rttEstimator)
39 , m_scheduler(m_face.getIoService())
40 , m_nextSegmentNo(0)
41 , m_receivedSize(0)
42 , m_highData(0)
43 , m_highInterest(0)
44 , m_recPoint(0)
45 , m_nInFlight(0)
46 , m_nReceived(0)
47 , m_nLossEvents(0)
48 , m_nRetransmitted(0)
49 , m_cwnd(m_options.initCwnd)
50 , m_ssthresh(m_options.initSsthresh)
51 , m_hasFailure(false)
52 , m_failedSegNo(0)
53{
54 if (m_options.isVerbose) {
55 std::cerr << m_options;
56 }
57}
58
59PipelineInterestsAimd::~PipelineInterestsAimd()
60{
61 cancel();
62}
63
64void
65PipelineInterestsAimd::doRun()
66{
67 // record the start time of running pipeline
68 m_startTime = time::steady_clock::now();
69
70 // count the excluded segment
71 m_nReceived++;
72
73 // schedule the event to check retransmission timer
74 m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
75
76 sendInterest(getNextSegmentNo(), false);
77}
78
79void
80PipelineInterestsAimd::doCancel()
81{
82 for (const auto& entry : m_segmentInfo) {
83 const SegmentInfo& segInfo = entry.second;
84 m_face.removePendingInterest(segInfo.interestId);
85 }
86 m_segmentInfo.clear();
87 m_scheduler.cancelAllEvents();
88}
89
90void
91PipelineInterestsAimd::checkRto()
92{
93 if (isStopping())
94 return;
95
96 int timeoutCount = 0;
97
98 for (auto& entry : m_segmentInfo) {
99 SegmentInfo& segInfo = entry.second;
100 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
101 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
102 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
103 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
104 uint64_t timedoutSeg = entry.first;
105 m_retxQueue.push(timedoutSeg); // put on retx queue
106 segInfo.state = SegmentState::InRetxQueue; // update status
107 timeoutCount++;
108 }
109 }
110 }
111
112 if (timeoutCount > 0) {
113 handleTimeout(timeoutCount);
114 }
115
116 // schedule the next check after predefined interval
117 m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
118}
119
120void
121PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
122{
123 if (isStopping())
124 return;
125
126 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
127 return;
128
129 if (!isRetransmission && m_hasFailure)
130 return;
131
132 if (m_options.isVerbose) {
133 if (isRetransmission)
134 std::cerr << "Retransmitting segment #" << segNo << std::endl;
135 else
136 std::cerr << "Requesting segment #" << segNo << std::endl;
137 }
138
139 if (isRetransmission) {
140 auto ret = m_retxCount.insert(std::make_pair(segNo, 1));
141 if (ret.second == false) { // not the first retransmission
142 m_retxCount[segNo] += 1;
143 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
144 return handleFail(segNo, "Reached the maximum number of retries (" +
145 to_string(m_options.maxRetriesOnTimeoutOrNack) +
146 ") while retrieving segment #" + to_string(segNo));
147 }
148
149 if (m_options.isVerbose) {
150 std::cerr << "# of retries for segment #" << segNo
151 << " is " << m_retxCount[segNo] << std::endl;
152 }
153 }
154
155 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
156 }
157
158 Interest interest(Name(m_prefix).appendSegment(segNo));
159 interest.setInterestLifetime(m_options.interestLifetime);
160 interest.setMustBeFresh(m_options.mustBeFresh);
161 interest.setMaxSuffixComponents(1);
162
163 auto interestId = m_face.expressInterest(interest,
164 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
165 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
166 bind(&PipelineInterestsAimd::handleLifetimeExpiration,
167 this, _1));
168
169 m_nInFlight++;
170
171 if (isRetransmission) {
172 SegmentInfo& segInfo = m_segmentInfo[segNo];
173 segInfo.state = SegmentState::Retransmitted;
174 segInfo.rto = m_rttEstimator.getEstimatedRto();
175 segInfo.timeSent = time::steady_clock::now();
176 m_nRetransmitted++;
177 }
178 else {
179 m_highInterest = segNo;
180 Milliseconds rto = m_rttEstimator.getEstimatedRto();
181 SegmentInfo segInfo{interestId, SegmentState::FirstTimeSent, rto, time::steady_clock::now()};
182
183 m_segmentInfo.emplace(segNo, segInfo);
184 }
185}
186
187void
188PipelineInterestsAimd::schedulePackets()
189{
190 int availableWindowSize = static_cast<int>(m_cwnd) - m_nInFlight;
191 while (availableWindowSize > 0) {
192 if (!m_retxQueue.empty()) { // do retransmission first
193 uint64_t retxSegNo = m_retxQueue.front();
194 m_retxQueue.pop();
195
196 auto it = m_segmentInfo.find(retxSegNo);
197 if (it == m_segmentInfo.end()) {
198 continue;
199 }
200 // the segment is still in the map, it means that it needs to be retransmitted
201 sendInterest(retxSegNo, true);
202 }
203 else { // send next segment
204 sendInterest(getNextSegmentNo(), false);
205 }
206 availableWindowSize--;
207 }
208}
209
210void
211PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
212{
213 if (isStopping())
214 return;
215
216 // Data name will not have extra components because MaxSuffixComponents is set to 1
217 BOOST_ASSERT(data.getName().equals(interest.getName()));
218
219 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
220 m_lastSegmentNo = data.getFinalBlockId().toSegment();
221 m_hasFinalBlockId = true;
222 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
223 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
224 // previously failed segment is part of the content
225 return onFailure(m_failureReason);
226 } else {
227 m_hasFailure = false;
228 }
229 }
230
231 uint64_t recvSegNo = data.getName()[-1].toSegment();
232 if (m_highData < recvSegNo) {
233 m_highData = recvSegNo;
234 }
235
236 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
237 if (segInfo.state == SegmentState::RetxReceived) {
238 m_segmentInfo.erase(recvSegNo);
239 return; // ignore already-received segment
240 }
241
242 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
243
244 if (m_options.isVerbose) {
245 std::cerr << "Received segment #" << recvSegNo
246 << ", rtt=" << rtt.count() << "ms"
247 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
248 }
249
250 // for segments in retransmission queue, no need to decrement m_nInFlight since
251 // it's already been decremented when segments timed out
252 if (segInfo.state != SegmentState::InRetxQueue && m_nInFlight > 0) {
253 m_nInFlight--;
254 }
255
256 m_receivedSize += data.getContent().value_size();
257 m_nReceived++;
258
259 increaseWindow();
260 onData(interest, data);
261
262 if (segInfo.state == SegmentState::FirstTimeSent ||
263 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
264 size_t nExpectedSamples = std::max(static_cast<int>(std::ceil(m_nInFlight / 2.0)), 1);
265 m_rttEstimator.addMeasurement(recvSegNo, rtt, nExpectedSamples);
266 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
267 }
268 else { // retransmission
269 segInfo.state = SegmentState::RetxReceived;
270 }
271
272 BOOST_ASSERT(m_nReceived > 0);
273 if (m_hasFinalBlockId && m_nReceived - 1 >= m_lastSegmentNo) { // all segments have been received
274 cancel();
275 if (m_options.isVerbose) {
276 printSummary();
277 }
278 }
279 else {
280 schedulePackets();
281 }
282}
283
284void
285PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
286{
287 if (isStopping())
288 return;
289
290 if (m_options.isVerbose)
291 std::cerr << "Received Nack with reason " << nack.getReason()
292 << " for Interest " << interest << std::endl;
293
294 uint64_t segNo = interest.getName()[-1].toSegment();
295
296 switch (nack.getReason()) {
297 case lp::NackReason::DUPLICATE: {
298 break; // ignore duplicates
299 }
300 case lp::NackReason::CONGESTION: { // treated the same as timeout for now
301 m_retxQueue.push(segNo); // put on retx queue
302 m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
303 handleTimeout(1);
304 break;
305 }
306 default: {
307 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
308 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
309 break;
310 }
311 }
312}
313
314void
315PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
316{
317 if (isStopping())
318 return;
319
320 uint64_t segNo = interest.getName()[-1].toSegment();
321 m_retxQueue.push(segNo); // put on retx queue
322 m_segmentInfo[segNo].state = SegmentState::InRetxQueue; // update state
323 handleTimeout(1);
324}
325
326void
327PipelineInterestsAimd::handleTimeout(int timeoutCount)
328{
329 if (timeoutCount <= 0)
330 return;
331
332 if (m_options.disableCwa || m_highData > m_recPoint) {
333 // react to only one timeout per RTT (conservative window adaptation)
334 m_recPoint = m_highInterest;
335
336 decreaseWindow();
337 m_rttEstimator.backoffRto();
338 m_nLossEvents++;
339
340 if (m_options.isVerbose) {
341 std::cerr << "Packet loss event, cwnd = " << m_cwnd
342 << ", ssthresh = " << m_ssthresh << std::endl;
343 }
344 }
345
346 if (m_nInFlight > static_cast<uint64_t>(timeoutCount))
347 m_nInFlight -= timeoutCount;
348 else
349 m_nInFlight = 0;
350
351 schedulePackets();
352}
353
354void
355PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
356{
357 if (isStopping())
358 return;
359
360 // if the failed segment is definitely part of the content, raise a fatal error
361 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
362 return onFailure(reason);
363
364 if (!m_hasFinalBlockId) {
365 m_segmentInfo.erase(segNo);
366 if (m_nInFlight > 0)
367 m_nInFlight--;
368
369 if (m_segmentInfo.empty()) {
370 onFailure("Fetching terminated but no final segment number has been found");
371 }
372 else {
373 cancelInFlightSegmentsGreaterThan(segNo);
374 m_hasFailure = true;
375 m_failedSegNo = segNo;
376 m_failureReason = reason;
377 }
378 }
379}
380
381void
382PipelineInterestsAimd::increaseWindow()
383{
384 if (m_cwnd < m_ssthresh) {
385 m_cwnd += m_options.aiStep; // additive increase
386 } else {
387 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
388 }
389 afterCwndChange(time::steady_clock::now() - m_startTime, m_cwnd);
390}
391
392void
393PipelineInterestsAimd::decreaseWindow()
394{
395 // please refer to RFC 5681, Section 3.1 for the rationale behind it
396 m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
397 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
398 afterCwndChange(time::steady_clock::now() - m_startTime, m_cwnd);
399}
400
401uint64_t
402PipelineInterestsAimd::getNextSegmentNo()
403{
404 // get around the excluded segment
405 if (m_nextSegmentNo == m_excludedSegmentNo)
406 m_nextSegmentNo++;
407 return m_nextSegmentNo++;
408}
409
410void
411PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segmentNo)
412{
413 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
414 // cancel fetching all segments that follow
415 if (it->first > segmentNo) {
416 m_face.removePendingInterest(it->second.interestId);
417 it = m_segmentInfo.erase(it);
418 if (m_nInFlight > 0)
419 m_nInFlight--;
420 }
421 else {
422 ++it;
423 }
424 }
425}
426
427void
428PipelineInterestsAimd::printSummary() const
429{
430 Milliseconds timePassed = time::steady_clock::now() - m_startTime;
431 double throughput = (8 * m_receivedSize * 1000) / timePassed.count();
432
433 int pow = 0;
434 std::string throughputUnit;
435 while (throughput >= 1000.0 && pow < 4) {
436 throughput /= 1000.0;
437 pow++;
438 }
439 switch (pow) {
440 case 0:
441 throughputUnit = "bit/s";
442 break;
443 case 1:
444 throughputUnit = "kbit/s";
445 break;
446 case 2:
447 throughputUnit = "Mbit/s";
448 break;
449 case 3:
450 throughputUnit = "Gbit/s";
451 break;
452 case 4:
453 throughputUnit = "Tbit/s";
454 break;
455 }
456
457 std::cerr << "\nAll segments have been received.\n"
458 << "Total # of segments received: " << m_nReceived << "\n"
459 << "Time used: " << timePassed.count() << " ms" << "\n"
460 << "Total # of packet loss burst: " << m_nLossEvents << "\n"
461 << "Packet loss rate: "
462 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
463 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n"
464 << "Goodput: " << throughput << " " << throughputUnit << "\n";
465}
466
467std::ostream&
468operator<<(std::ostream& os, SegmentState state)
469{
470 switch (state) {
471 case SegmentState::FirstTimeSent:
472 os << "FirstTimeSent";
473 break;
474 case SegmentState::InRetxQueue:
475 os << "InRetxQueue";
476 break;
477 case SegmentState::Retransmitted:
478 os << "Retransmitted";
479 break;
480 case SegmentState::RetxReceived:
481 os << "RetxReceived";
482 break;
483 }
484
485 return os;
486}
487
488std::ostream&
489operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
490{
491 os << "PipelineInterestsAimd initial parameters:" << "\n"
492 << "\tInitial congestion window size = " << options.initCwnd << "\n"
493 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
494 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
495 << "\tAdditive increase step = " << options.aiStep << "\n"
496 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
497 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n";
498
499 std::string cwaStatus = options.disableCwa ? "disabled" : "enabled";
500 os << "\tConservative Window Adaptation " << cwaStatus << "\n";
501
502 std::string cwndStatus = options.resetCwndToInit ? "initCwnd" : "ssthresh";
503 os << "\tResetting cwnd to " << cwndStatus << " when loss event occurs" << "\n";
504 return os;
505}
506
507} // namespace aimd
508} // namespace chunks
509} // namespace ndn