blob: 3b350012bf15b9ecd6cc96be7b0813243fa57b41 [file] [log] [blame]
Davide Pesaventobf1c0692017-01-15 19:15:09 -05001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +00002/*
Davide Pesaventocd65c2c2017-01-15 16:10:38 -05003 * Copyright (c) 2016-2017, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Weiwei Liu245d7912016-07-28 00:04:25 -07006 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +000025 * @author Chavoosh Ghasemi
Weiwei Liu245d7912016-07-28 00:04:25 -070026 */
27
28#include "pipeline-interests-aimd.hpp"
29
30#include <cmath>
31
32namespace ndn {
33namespace chunks {
34namespace aimd {
35
36PipelineInterestsAimd::PipelineInterestsAimd(Face& face, RttEstimator& rttEstimator,
37 const Options& options)
38 : PipelineInterests(face)
39 , m_options(options)
40 , m_rttEstimator(rttEstimator)
41 , m_scheduler(m_face.getIoService())
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050042 , m_checkRtoEvent(m_scheduler)
Weiwei Liu245d7912016-07-28 00:04:25 -070043 , m_highData(0)
44 , m_highInterest(0)
45 , m_recPoint(0)
46 , m_nInFlight(0)
Weiwei Liu245d7912016-07-28 00:04:25 -070047 , m_nLossEvents(0)
48 , m_nRetransmitted(0)
49 , m_cwnd(m_options.initCwnd)
50 , m_ssthresh(m_options.initSsthresh)
51 , m_hasFailure(false)
52 , m_failedSegNo(0)
53{
54 if (m_options.isVerbose) {
55 std::cerr << m_options;
56 }
57}
58
59PipelineInterestsAimd::~PipelineInterestsAimd()
60{
61 cancel();
62}
63
64void
65PipelineInterestsAimd::doRun()
66{
Weiwei Liu245d7912016-07-28 00:04:25 -070067 // schedule the event to check retransmission timer
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050068 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -070069
Davide Pesavento958896e2017-01-19 00:52:04 -050070 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -070071}
72
73void
74PipelineInterestsAimd::doCancel()
75{
76 for (const auto& entry : m_segmentInfo) {
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050077 m_face.removePendingInterest(entry.second.interestId);
Weiwei Liu245d7912016-07-28 00:04:25 -070078 }
Davide Pesaventocd65c2c2017-01-15 16:10:38 -050079 m_checkRtoEvent.cancel();
Weiwei Liu245d7912016-07-28 00:04:25 -070080 m_segmentInfo.clear();
Weiwei Liu245d7912016-07-28 00:04:25 -070081}
82
83void
84PipelineInterestsAimd::checkRto()
85{
86 if (isStopping())
87 return;
88
Davide Pesavento958896e2017-01-19 00:52:04 -050089 bool hasTimeout = false;
Weiwei Liu245d7912016-07-28 00:04:25 -070090
91 for (auto& entry : m_segmentInfo) {
92 SegmentInfo& segInfo = entry.second;
93 if (segInfo.state != SegmentState::InRetxQueue && // do not check segments currently in the retx queue
94 segInfo.state != SegmentState::RetxReceived) { // or already-received retransmitted segments
95 Milliseconds timeElapsed = time::steady_clock::now() - segInfo.timeSent;
96 if (timeElapsed.count() > segInfo.rto.count()) { // timer expired?
Davide Pesavento958896e2017-01-19 00:52:04 -050097 hasTimeout = true;
98 enqueueForRetransmission(entry.first);
Weiwei Liu245d7912016-07-28 00:04:25 -070099 }
100 }
101 }
102
Davide Pesavento958896e2017-01-19 00:52:04 -0500103 if (hasTimeout) {
104 recordTimeout();
105 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700106 }
107
108 // schedule the next check after predefined interval
Davide Pesaventocd65c2c2017-01-15 16:10:38 -0500109 m_checkRtoEvent = m_scheduler.scheduleEvent(m_options.rtoCheckInterval, [this] { checkRto(); });
Weiwei Liu245d7912016-07-28 00:04:25 -0700110}
111
112void
113PipelineInterestsAimd::sendInterest(uint64_t segNo, bool isRetransmission)
114{
115 if (isStopping())
116 return;
117
118 if (m_hasFinalBlockId && segNo > m_lastSegmentNo && !isRetransmission)
119 return;
120
121 if (!isRetransmission && m_hasFailure)
122 return;
123
124 if (m_options.isVerbose) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500125 std::cerr << (isRetransmission ? "Retransmitting" : "Requesting")
126 << " segment #" << segNo << std::endl;
Weiwei Liu245d7912016-07-28 00:04:25 -0700127 }
128
129 if (isRetransmission) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500130 // keep track of retx count for this segment
131 auto ret = m_retxCount.emplace(segNo, 1);
Weiwei Liu245d7912016-07-28 00:04:25 -0700132 if (ret.second == false) { // not the first retransmission
133 m_retxCount[segNo] += 1;
134 if (m_retxCount[segNo] > m_options.maxRetriesOnTimeoutOrNack) {
135 return handleFail(segNo, "Reached the maximum number of retries (" +
136 to_string(m_options.maxRetriesOnTimeoutOrNack) +
137 ") while retrieving segment #" + to_string(segNo));
138 }
139
140 if (m_options.isVerbose) {
141 std::cerr << "# of retries for segment #" << segNo
142 << " is " << m_retxCount[segNo] << std::endl;
143 }
144 }
145
146 m_face.removePendingInterest(m_segmentInfo[segNo].interestId);
147 }
148
149 Interest interest(Name(m_prefix).appendSegment(segNo));
150 interest.setInterestLifetime(m_options.interestLifetime);
151 interest.setMustBeFresh(m_options.mustBeFresh);
152 interest.setMaxSuffixComponents(1);
153
154 auto interestId = m_face.expressInterest(interest,
155 bind(&PipelineInterestsAimd::handleData, this, _1, _2),
156 bind(&PipelineInterestsAimd::handleNack, this, _1, _2),
157 bind(&PipelineInterestsAimd::handleLifetimeExpiration,
158 this, _1));
159
160 m_nInFlight++;
161
162 if (isRetransmission) {
163 SegmentInfo& segInfo = m_segmentInfo[segNo];
Weiwei Liu245d7912016-07-28 00:04:25 -0700164 segInfo.timeSent = time::steady_clock::now();
Davide Pesavento958896e2017-01-19 00:52:04 -0500165 segInfo.rto = m_rttEstimator.getEstimatedRto();
166 segInfo.state = SegmentState::Retransmitted;
Weiwei Liu245d7912016-07-28 00:04:25 -0700167 m_nRetransmitted++;
168 }
169 else {
170 m_highInterest = segNo;
Davide Pesavento958896e2017-01-19 00:52:04 -0500171 m_segmentInfo[segNo] = {interestId,
172 time::steady_clock::now(),
173 m_rttEstimator.getEstimatedRto(),
174 SegmentState::FirstTimeSent};
Weiwei Liu245d7912016-07-28 00:04:25 -0700175 }
176}
177
178void
179PipelineInterestsAimd::schedulePackets()
180{
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500181 BOOST_ASSERT(m_nInFlight >= 0);
182 auto availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nInFlight;
183
Weiwei Liu245d7912016-07-28 00:04:25 -0700184 while (availableWindowSize > 0) {
185 if (!m_retxQueue.empty()) { // do retransmission first
186 uint64_t retxSegNo = m_retxQueue.front();
187 m_retxQueue.pop();
188
189 auto it = m_segmentInfo.find(retxSegNo);
190 if (it == m_segmentInfo.end()) {
191 continue;
192 }
193 // the segment is still in the map, it means that it needs to be retransmitted
194 sendInterest(retxSegNo, true);
195 }
196 else { // send next segment
197 sendInterest(getNextSegmentNo(), false);
198 }
199 availableWindowSize--;
200 }
201}
202
203void
204PipelineInterestsAimd::handleData(const Interest& interest, const Data& data)
205{
206 if (isStopping())
207 return;
208
209 // Data name will not have extra components because MaxSuffixComponents is set to 1
210 BOOST_ASSERT(data.getName().equals(interest.getName()));
211
212 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
213 m_lastSegmentNo = data.getFinalBlockId().toSegment();
214 m_hasFinalBlockId = true;
215 cancelInFlightSegmentsGreaterThan(m_lastSegmentNo);
216 if (m_hasFailure && m_lastSegmentNo >= m_failedSegNo) {
217 // previously failed segment is part of the content
218 return onFailure(m_failureReason);
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000219 }
220 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700221 m_hasFailure = false;
222 }
223 }
224
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500225 uint64_t recvSegNo = getSegmentFromPacket(data);
Weiwei Liu245d7912016-07-28 00:04:25 -0700226 SegmentInfo& segInfo = m_segmentInfo[recvSegNo];
227 if (segInfo.state == SegmentState::RetxReceived) {
228 m_segmentInfo.erase(recvSegNo);
229 return; // ignore already-received segment
230 }
231
232 Milliseconds rtt = time::steady_clock::now() - segInfo.timeSent;
Weiwei Liu245d7912016-07-28 00:04:25 -0700233 if (m_options.isVerbose) {
234 std::cerr << "Received segment #" << recvSegNo
235 << ", rtt=" << rtt.count() << "ms"
236 << ", rto=" << segInfo.rto.count() << "ms" << std::endl;
237 }
238
Davide Pesavento958896e2017-01-19 00:52:04 -0500239 if (m_highData < recvSegNo) {
240 m_highData = recvSegNo;
241 }
242
243 // for segments in retx queue, we must not decrement m_nInFlight
244 // because it was already decremented when the segment timed out
245 if (segInfo.state != SegmentState::InRetxQueue) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700246 m_nInFlight--;
247 }
248
Weiwei Liu245d7912016-07-28 00:04:25 -0700249 m_nReceived++;
Davide Pesavento958896e2017-01-19 00:52:04 -0500250 m_receivedSize += data.getContent().value_size();
Weiwei Liu245d7912016-07-28 00:04:25 -0700251
252 increaseWindow();
253 onData(interest, data);
254
255 if (segInfo.state == SegmentState::FirstTimeSent ||
256 segInfo.state == SegmentState::InRetxQueue) { // do not sample RTT for retransmitted segments
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500257 auto nExpectedSamples = std::max<int64_t>((m_nInFlight + 1) >> 1, 1);
258 BOOST_ASSERT(nExpectedSamples > 0);
259 m_rttEstimator.addMeasurement(recvSegNo, rtt, static_cast<size_t>(nExpectedSamples));
Weiwei Liu245d7912016-07-28 00:04:25 -0700260 m_segmentInfo.erase(recvSegNo); // remove the entry associated with the received segment
261 }
262 else { // retransmission
Davide Pesavento958896e2017-01-19 00:52:04 -0500263 BOOST_ASSERT(segInfo.state == SegmentState::Retransmitted);
Weiwei Liu245d7912016-07-28 00:04:25 -0700264 segInfo.state = SegmentState::RetxReceived;
265 }
266
267 BOOST_ASSERT(m_nReceived > 0);
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500268 if (m_hasFinalBlockId &&
269 static_cast<uint64_t>(m_nReceived - 1) >= m_lastSegmentNo) { // all segments have been received
Weiwei Liu245d7912016-07-28 00:04:25 -0700270 cancel();
271 if (m_options.isVerbose) {
272 printSummary();
273 }
274 }
275 else {
276 schedulePackets();
277 }
278}
279
280void
281PipelineInterestsAimd::handleNack(const Interest& interest, const lp::Nack& nack)
282{
283 if (isStopping())
284 return;
285
286 if (m_options.isVerbose)
287 std::cerr << "Received Nack with reason " << nack.getReason()
288 << " for Interest " << interest << std::endl;
289
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500290 uint64_t segNo = getSegmentFromPacket(interest);
Weiwei Liu245d7912016-07-28 00:04:25 -0700291
292 switch (nack.getReason()) {
Davide Pesavento958896e2017-01-19 00:52:04 -0500293 case lp::NackReason::DUPLICATE:
294 // ignore duplicates
Weiwei Liu245d7912016-07-28 00:04:25 -0700295 break;
Davide Pesavento958896e2017-01-19 00:52:04 -0500296 case lp::NackReason::CONGESTION:
297 // treated the same as timeout for now
298 enqueueForRetransmission(segNo);
299 recordTimeout();
300 schedulePackets();
301 break;
302 default:
Weiwei Liu245d7912016-07-28 00:04:25 -0700303 handleFail(segNo, "Could not retrieve data for " + interest.getName().toUri() +
304 ", reason: " + boost::lexical_cast<std::string>(nack.getReason()));
305 break;
Weiwei Liu245d7912016-07-28 00:04:25 -0700306 }
307}
308
309void
310PipelineInterestsAimd::handleLifetimeExpiration(const Interest& interest)
311{
312 if (isStopping())
313 return;
314
Davide Pesavento958896e2017-01-19 00:52:04 -0500315 enqueueForRetransmission(getSegmentFromPacket(interest));
316 recordTimeout();
317 schedulePackets();
Weiwei Liu245d7912016-07-28 00:04:25 -0700318}
319
320void
Davide Pesavento958896e2017-01-19 00:52:04 -0500321PipelineInterestsAimd::recordTimeout()
Weiwei Liu245d7912016-07-28 00:04:25 -0700322{
Weiwei Liu245d7912016-07-28 00:04:25 -0700323 if (m_options.disableCwa || m_highData > m_recPoint) {
324 // react to only one timeout per RTT (conservative window adaptation)
325 m_recPoint = m_highInterest;
326
327 decreaseWindow();
328 m_rttEstimator.backoffRto();
329 m_nLossEvents++;
330
331 if (m_options.isVerbose) {
332 std::cerr << "Packet loss event, cwnd = " << m_cwnd
333 << ", ssthresh = " << m_ssthresh << std::endl;
334 }
335 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500336}
Weiwei Liu245d7912016-07-28 00:04:25 -0700337
Davide Pesavento958896e2017-01-19 00:52:04 -0500338void
339PipelineInterestsAimd::enqueueForRetransmission(uint64_t segNo)
340{
341 BOOST_ASSERT(m_nInFlight > 0);
342 m_nInFlight--;
343 m_retxQueue.push(segNo);
344 m_segmentInfo.at(segNo).state = SegmentState::InRetxQueue;
Weiwei Liu245d7912016-07-28 00:04:25 -0700345}
346
347void
348PipelineInterestsAimd::handleFail(uint64_t segNo, const std::string& reason)
349{
350 if (isStopping())
351 return;
352
353 // if the failed segment is definitely part of the content, raise a fatal error
354 if (m_hasFinalBlockId && segNo <= m_lastSegmentNo)
355 return onFailure(reason);
356
357 if (!m_hasFinalBlockId) {
358 m_segmentInfo.erase(segNo);
Davide Pesavento958896e2017-01-19 00:52:04 -0500359 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700360
361 if (m_segmentInfo.empty()) {
362 onFailure("Fetching terminated but no final segment number has been found");
363 }
364 else {
365 cancelInFlightSegmentsGreaterThan(segNo);
366 m_hasFailure = true;
367 m_failedSegNo = segNo;
368 m_failureReason = reason;
369 }
370 }
371}
372
373void
374PipelineInterestsAimd::increaseWindow()
375{
376 if (m_cwnd < m_ssthresh) {
377 m_cwnd += m_options.aiStep; // additive increase
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000378 }
379 else {
Weiwei Liu245d7912016-07-28 00:04:25 -0700380 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
381 }
Davide Pesavento958896e2017-01-19 00:52:04 -0500382
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500383 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700384}
385
386void
387PipelineInterestsAimd::decreaseWindow()
388{
389 // please refer to RFC 5681, Section 3.1 for the rationale behind it
390 m_ssthresh = std::max(2.0, m_cwnd * m_options.mdCoef); // multiplicative decrease
391 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
Davide Pesavento958896e2017-01-19 00:52:04 -0500392
Davide Pesaventobf1c0692017-01-15 19:15:09 -0500393 afterCwndChange(time::steady_clock::now() - getStartTime(), m_cwnd);
Weiwei Liu245d7912016-07-28 00:04:25 -0700394}
395
Weiwei Liu245d7912016-07-28 00:04:25 -0700396void
Davide Pesavento958896e2017-01-19 00:52:04 -0500397PipelineInterestsAimd::cancelInFlightSegmentsGreaterThan(uint64_t segNo)
Weiwei Liu245d7912016-07-28 00:04:25 -0700398{
399 for (auto it = m_segmentInfo.begin(); it != m_segmentInfo.end();) {
400 // cancel fetching all segments that follow
Davide Pesavento958896e2017-01-19 00:52:04 -0500401 if (it->first > segNo) {
Weiwei Liu245d7912016-07-28 00:04:25 -0700402 m_face.removePendingInterest(it->second.interestId);
403 it = m_segmentInfo.erase(it);
Davide Pesavento958896e2017-01-19 00:52:04 -0500404 m_nInFlight--;
Weiwei Liu245d7912016-07-28 00:04:25 -0700405 }
406 else {
407 ++it;
408 }
409 }
410}
411
412void
413PipelineInterestsAimd::printSummary() const
414{
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000415 PipelineInterests::printSummary();
416 std::cerr << "Total # of packet loss events: " << m_nLossEvents << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700417 << "Packet loss rate: "
418 << static_cast<double>(m_nLossEvents) / static_cast<double>(m_nReceived) << "\n"
Chavoosh Ghasemi4d36ed52017-10-31 22:26:25 +0000419 << "Total # of retransmitted segments: " << m_nRetransmitted << "\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700420}
421
422std::ostream&
423operator<<(std::ostream& os, SegmentState state)
424{
425 switch (state) {
426 case SegmentState::FirstTimeSent:
427 os << "FirstTimeSent";
428 break;
429 case SegmentState::InRetxQueue:
430 os << "InRetxQueue";
431 break;
432 case SegmentState::Retransmitted:
433 os << "Retransmitted";
434 break;
435 case SegmentState::RetxReceived:
436 os << "RetxReceived";
437 break;
438 }
Weiwei Liu245d7912016-07-28 00:04:25 -0700439 return os;
440}
441
442std::ostream&
443operator<<(std::ostream& os, const PipelineInterestsAimdOptions& options)
444{
Davide Pesavento958896e2017-01-19 00:52:04 -0500445 os << "PipelineInterestsAimd initial parameters:\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700446 << "\tInitial congestion window size = " << options.initCwnd << "\n"
447 << "\tInitial slow start threshold = " << options.initSsthresh << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700448 << "\tAdditive increase step = " << options.aiStep << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500449 << "\tMultiplicative decrease factor = " << options.mdCoef << "\n"
Weiwei Liu245d7912016-07-28 00:04:25 -0700450 << "\tRTO check interval = " << options.rtoCheckInterval << "\n"
Davide Pesavento958896e2017-01-19 00:52:04 -0500451 << "\tMax retries on timeout or Nack = " << options.maxRetriesOnTimeoutOrNack << "\n"
452 << "\tConservative Window Adaptation " << (options.disableCwa ? "disabled" : "enabled") << "\n"
453 << "\tResetting cwnd to " << (options.resetCwndToInit ? "initCwnd" : "ssthresh") << " upon loss event\n";
Weiwei Liu245d7912016-07-28 00:04:25 -0700454 return os;
455}
456
457} // namespace aimd
458} // namespace chunks
459} // namespace ndn