blob: 24ddad92458f188184c2cd892b1f85b1eb52a74c [file] [log] [blame]
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -08002/*
Eric Newberrye345baa2018-05-23 18:17:07 -07003 * Copyright (c) 2013-2018, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07006 *
7 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8 *
9 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10 * terms of the GNU Lesser General Public License as published by the Free Software
11 * Foundation, either version 3 of the License, or (at your option) any later version.
12 *
13 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16 *
17 * You should have received copies of the GNU General Public License and GNU Lesser
18 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19 * <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Eric Newberrye345baa2018-05-23 18:17:07 -070022 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
25 * @author Chavoosh Ghasemi
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070026 */
27
28#include "segment-fetcher.hpp"
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050029#include "../name-component.hpp"
Eric Newberrye345baa2018-05-23 18:17:07 -070030#include "../encoding/buffer-stream.hpp"
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050031#include "../lp/nack.hpp"
32#include "../lp/nack-header.hpp"
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070033
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050034#include <boost/asio/io_service.hpp>
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080035#include <boost/lexical_cast.hpp>
Davide Pesavento5afbb0b2018-01-01 17:24:18 -050036#include <cmath>
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080037
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070038namespace ndn {
39namespace util {
40
Eric Newberrye345baa2018-05-23 18:17:07 -070041constexpr double SegmentFetcher::MIN_SSTHRESH;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050042
Eric Newberrye345baa2018-05-23 18:17:07 -070043void
44SegmentFetcher::Options::validate()
45{
46 if (maxTimeout < 1_ms) {
47 BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
48 }
49
50 if (initCwnd < 1.0) {
51 BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
52 }
53
54 if (aiStep < 0.0) {
55 BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
56 }
57
58 if (mdCoef < 0.0 || mdCoef > 1.0) {
59 BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
60 }
61}
62
63SegmentFetcher::SegmentFetcher(Face& face,
64 security::v2::Validator& validator,
65 const SegmentFetcher::Options& options)
66 : m_options(options)
67 , m_face(face)
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050068 , m_scheduler(m_face.getIoService())
69 , m_validator(validator)
Eric Newberrye345baa2018-05-23 18:17:07 -070070 , m_rttEstimator(options.rttOptions)
71 , m_timeLastSegmentReceived(time::steady_clock::now())
72 , m_nextSegmentNum(0)
73 , m_cwnd(options.initCwnd)
74 , m_ssthresh(options.initSsthresh)
75 , m_nSegmentsInFlight(0)
76 , m_nSegments(0)
77 , m_highInterest(0)
78 , m_highData(0)
79 , m_recPoint(0)
80 , m_nReceived(0)
81 , m_nBytesReceived(0)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070082{
Eric Newberrye345baa2018-05-23 18:17:07 -070083 m_options.validate();
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070084}
85
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +000086shared_ptr<SegmentFetcher>
Eric Newberrycc910cd2018-05-06 17:01:40 -070087SegmentFetcher::start(Face& face,
88 const Interest& baseInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -070089 security::v2::Validator& validator,
90 const SegmentFetcher::Options& options)
Eric Newberrycc910cd2018-05-06 17:01:40 -070091{
Eric Newberrye345baa2018-05-23 18:17:07 -070092 shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050093 fetcher->m_this = fetcher;
94 fetcher->fetchFirstSegment(baseInterest, false);
Eric Newberrycc910cd2018-05-06 17:01:40 -070095 return fetcher;
96}
97
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070098void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050099SegmentFetcher::stop()
100{
101 if (!m_this) {
102 return;
103 }
104
105 for (const auto& pendingSegment : m_pendingSegments) {
106 m_face.removePendingInterest(pendingSegment.second.id);
107 if (pendingSegment.second.timeoutEvent) {
108 m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
109 }
110 }
111 m_face.getIoService().post([self = std::move(m_this)] {});
112}
113
114bool
115SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
116{
117 auto self = weakSelf.lock();
118 return self == nullptr || self->m_this == nullptr;
119}
120
121void
122SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700123{
124 Interest interest(baseInterest);
Eric Newberry2b765f82018-06-25 14:51:13 -0700125 interest.setCanBePrefix(true);
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700126 interest.setMustBeFresh(true);
Eric Newberrye345baa2018-05-23 18:17:07 -0700127 interest.setInterestLifetime(m_options.interestLifetime);
128 if (isRetransmission) {
129 interest.refreshNonce();
130 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700131
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500132 weak_ptr<SegmentFetcher> weakSelf = m_this;
133
Eric Newberrye345baa2018-05-23 18:17:07 -0700134 m_nSegmentsInFlight++;
135 auto pendingInterest = m_face.expressInterest(interest,
136 bind(&SegmentFetcher::afterSegmentReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500137 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700138 bind(&SegmentFetcher::afterNackReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500139 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700140 nullptr);
141 auto timeoutEvent =
142 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500143 bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
144
Eric Newberrye345baa2018-05-23 18:17:07 -0700145 if (isRetransmission) {
146 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
147 }
148 else {
149 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
150 m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
151 pendingInterest, timeoutEvent});
152 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700153}
154
155void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500156SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700157{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500158 weak_ptr<SegmentFetcher> weakSelf = m_this;
159
Eric Newberrye345baa2018-05-23 18:17:07 -0700160 if (checkAllSegmentsReceived()) {
161 // All segments have been retrieved
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500162 return finalizeFetch();
Eric Newberrye345baa2018-05-23 18:17:07 -0700163 }
164
165 int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
Eric Newberrye345baa2018-05-23 18:17:07 -0700166 std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
167
168 while (availableWindowSize > 0) {
169 if (!m_retxQueue.empty()) {
170 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
171 m_retxQueue.pop();
172 if (pendingSegmentIt == m_pendingSegments.end()) {
173 // Skip re-requesting this segment, since it was received after RTO timeout
174 continue;
175 }
176 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
177 segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
178 }
179 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
180 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
181 // Don't request a segment a second time if received in response to first "discovery" Interest
182 m_nextSegmentNum++;
183 continue;
184 }
185 segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
186 }
187 else {
188 break;
189 }
190 availableWindowSize--;
191 }
192
193 for (const auto& segment : segmentsToRequest) {
194 Interest interest(origInterest); // to preserve Interest elements
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500195 interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
Eric Newberry2b765f82018-06-25 14:51:13 -0700196 interest.setCanBePrefix(false);
Eric Newberrye345baa2018-05-23 18:17:07 -0700197 interest.setMustBeFresh(false);
Eric Newberrye345baa2018-05-23 18:17:07 -0700198 interest.setInterestLifetime(m_options.interestLifetime);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500199 interest.refreshNonce();
200
Eric Newberrye345baa2018-05-23 18:17:07 -0700201 m_nSegmentsInFlight++;
202 auto pendingInterest = m_face.expressInterest(interest,
203 bind(&SegmentFetcher::afterSegmentReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500204 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700205 bind(&SegmentFetcher::afterNackReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500206 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700207 nullptr);
208 auto timeoutEvent =
209 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500210 bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
211
Eric Newberrye345baa2018-05-23 18:17:07 -0700212 if (segment.second) { // Retransmission
213 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
214 }
215 else { // First request for segment
216 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
217 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
218 time::steady_clock::now(),
219 pendingInterest, timeoutEvent});
220 m_highInterest = segment.first;
221 }
222 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700223}
224
225void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500226SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
227 const weak_ptr<SegmentFetcher>& weakSelf)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700228{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500229 if (shouldStop(weakSelf))
230 return;
231
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000232 afterSegmentReceived(data);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500233
Eric Newberrye345baa2018-05-23 18:17:07 -0700234 BOOST_ASSERT(m_nSegmentsInFlight > 0);
235 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700236
Eric Newberrye345baa2018-05-23 18:17:07 -0700237 name::Component currentSegmentComponent = data.getName().get(-1);
238 if (!currentSegmentComponent.isSegment()) {
239 return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
240 }
241
242 uint64_t currentSegment = currentSegmentComponent.toSegment();
243
244 // The first received Interest could have any segment ID
245 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
246 if (m_receivedSegments.size() > 0) {
247 pendingSegmentIt = m_pendingSegments.find(currentSegment);
248 }
249 else {
250 pendingSegmentIt = m_pendingSegments.begin();
251 }
252
253 // Cancel timeout event
254 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
255 pendingSegmentIt->second.timeoutEvent = nullptr;
256
257 m_validator.validate(data,
258 bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500259 pendingSegmentIt, weakSelf),
260 bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500261}
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700262
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500263void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500264SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -0700265 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500266 const weak_ptr<SegmentFetcher>& weakSelf)
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500267{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500268 if (shouldStop(weakSelf))
269 return;
270
Eric Newberrye345baa2018-05-23 18:17:07 -0700271 // We update the last receive time here instead of in the segment received callback so that the
272 // transfer will not fail to terminate if we only received invalid Data packets.
273 m_timeLastSegmentReceived = time::steady_clock::now();
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500274
Eric Newberrye345baa2018-05-23 18:17:07 -0700275 m_nReceived++;
276
277 // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
278 uint64_t currentSegment = data.getName().get(-1).toSegment();
279 // Add measurement to RTO estimator (if not retransmission)
280 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
281 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
282 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
283 }
284
285 // Remove from pending segments map
286 m_pendingSegments.erase(pendingSegmentIt);
287
288 // Copy data in segment to temporary buffer
289 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
290 std::forward_as_tuple(currentSegment),
291 std::forward_as_tuple(data.getContent().value_size()));
292 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
293 receivedSegmentIt.first->second.begin());
294 m_nBytesReceived += data.getContent().value_size();
295 afterSegmentValidated(data);
296
297 if (data.getFinalBlock()) {
298 if (!data.getFinalBlock()->isSegment()) {
299 return signalError(FINALBLOCKID_NOT_SEGMENT,
300 "Received FinalBlockId did not contain a segment component");
301 }
302
303 if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
304 m_nSegments = data.getFinalBlock()->toSegment() + 1;
305 cancelExcessInFlightSegments();
306 }
307 }
308
309 if (m_receivedSegments.size() == 1) {
310 m_versionedDataName = data.getName().getPrefix(-1);
311 if (currentSegment == 0) {
312 // We received the first segment in response, so we can increment the next segment number
313 m_nextSegmentNum++;
314 }
315 }
316
317 if (m_highData < currentSegment) {
318 m_highData = currentSegment;
319 }
320
321 if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
322 windowDecrease();
323 }
324 else {
325 windowIncrease();
326 }
327
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500328 fetchSegmentsInWindow(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700329}
330
331void
332SegmentFetcher::afterValidationFailure(const Data& data,
333 const security::v2::ValidationError& error,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500334 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700335{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500336 if (shouldStop(weakSelf))
337 return;
338
339 signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
Eric Newberrye345baa2018-05-23 18:17:07 -0700340}
341
Eric Newberrye345baa2018-05-23 18:17:07 -0700342void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500343SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
344 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700345{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500346 if (shouldStop(weakSelf))
347 return;
348
Eric Newberrye345baa2018-05-23 18:17:07 -0700349 afterSegmentNacked();
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500350
Eric Newberrye345baa2018-05-23 18:17:07 -0700351 BOOST_ASSERT(m_nSegmentsInFlight > 0);
352 m_nSegmentsInFlight--;
353
354 switch (nack.getReason()) {
355 case lp::NackReason::DUPLICATE:
356 case lp::NackReason::CONGESTION:
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500357 afterNackOrTimeout(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700358 break;
359 default:
360 signalError(NACK_ERROR, "Nack Error");
361 break;
362 }
363}
364
365void
366SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500367 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700368{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500369 if (shouldStop(weakSelf))
370 return;
371
Eric Newberrye345baa2018-05-23 18:17:07 -0700372 afterSegmentTimedOut();
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500373
Eric Newberrye345baa2018-05-23 18:17:07 -0700374 BOOST_ASSERT(m_nSegmentsInFlight > 0);
375 m_nSegmentsInFlight--;
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500376 afterNackOrTimeout(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700377}
378
379void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500380SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
Eric Newberrye345baa2018-05-23 18:17:07 -0700381{
382 if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500383 // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
Eric Newberrye345baa2018-05-23 18:17:07 -0700384 return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
385 }
386
387 name::Component lastNameComponent = origInterest.getName().get(-1);
388 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
389 BOOST_ASSERT(m_pendingSegments.size() > 0);
390 if (lastNameComponent.isSegment()) {
391 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
392 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
393 }
394 else { // First Interest
395 BOOST_ASSERT(m_pendingSegments.size() > 0);
396 pendingSegmentIt = m_pendingSegments.begin();
397 }
398
399 // Cancel timeout event and set status to InRetxQueue
400 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
401 pendingSegmentIt->second.timeoutEvent = nullptr;
402 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
403
404 m_rttEstimator.backoffRto();
405
406 if (m_receivedSegments.size() == 0) {
407 // Resend first Interest (until maximum receive timeout exceeded)
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500408 fetchFirstSegment(origInterest, true);
Eric Newberrye345baa2018-05-23 18:17:07 -0700409 }
410 else {
411 windowDecrease();
412 m_retxQueue.push(pendingSegmentIt->first);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500413 fetchSegmentsInWindow(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700414 }
415}
416
417void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500418SegmentFetcher::finalizeFetch()
Eric Newberrye345baa2018-05-23 18:17:07 -0700419{
420 // Combine segments into final buffer
421 OBufferStream buf;
422 // We may have received more segments than exist in the object.
423 BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
424
425 for (int64_t i = 0; i < m_nSegments; i++) {
426 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
427 }
428
429 onComplete(buf.buf());
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500430 stop();
Eric Newberrye345baa2018-05-23 18:17:07 -0700431}
432
433void
434SegmentFetcher::windowIncrease()
435{
436 if (m_options.useConstantCwnd) {
437 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
438 return;
439 }
440
441 if (m_cwnd < m_ssthresh) {
442 m_cwnd += m_options.aiStep; // additive increase
443 }
444 else {
445 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
446 }
447}
448
449void
450SegmentFetcher::windowDecrease()
451{
452 if (m_options.disableCwa || m_highData > m_recPoint) {
453 m_recPoint = m_highInterest;
454
455 if (m_options.useConstantCwnd) {
456 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
457 return;
458 }
459
460 // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
461 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
462 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
463 }
464}
465
466void
467SegmentFetcher::signalError(uint32_t code, const std::string& msg)
468{
Eric Newberrye345baa2018-05-23 18:17:07 -0700469 onError(code, msg);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500470 stop();
Eric Newberrye345baa2018-05-23 18:17:07 -0700471}
472
473void
474SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
475 const PendingInterestId* pendingInterest,
476 scheduler::EventId timeoutEvent)
477{
478 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
479 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
480 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
481 pendingSegmentIt->second.state = SegmentState::Retransmitted;
482 pendingSegmentIt->second.id = pendingInterest;
483 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
484}
485
486void
487SegmentFetcher::cancelExcessInFlightSegments()
488{
489 for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
490 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
491 m_face.removePendingInterest(it->second.id);
492 if (it->second.timeoutEvent) {
493 m_scheduler.cancelEvent(it->second.timeoutEvent);
494 }
495 it = m_pendingSegments.erase(it);
496 BOOST_ASSERT(m_nSegmentsInFlight > 0);
497 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700498 }
499 else {
Eric Newberrye345baa2018-05-23 18:17:07 -0700500 ++it;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500501 }
502 }
503}
504
Eric Newberrye345baa2018-05-23 18:17:07 -0700505bool
506SegmentFetcher::checkAllSegmentsReceived()
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500507{
Eric Newberrye345baa2018-05-23 18:17:07 -0700508 bool haveReceivedAllSegments = false;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500509
Eric Newberrye345baa2018-05-23 18:17:07 -0700510 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
511 haveReceivedAllSegments = true;
512 // Verify that all segments in window have been received. If not, send Interests for missing segments.
513 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
514 if (m_receivedSegments.count(i) == 0) {
515 m_retxQueue.push(i);
516 haveReceivedAllSegments = false;
517 }
518 }
Muktadir R Chowdhury2bc2df02016-04-05 16:55:41 -0500519 }
520
Eric Newberrye345baa2018-05-23 18:17:07 -0700521 return haveReceivedAllSegments;
522}
523
524time::milliseconds
525SegmentFetcher::getEstimatedRto()
526{
527 // We don't want an Interest timeout greater than the maximum allowed timeout between the
528 // succesful receipt of segments
529 return std::min(m_options.maxTimeout,
530 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500531}
532
533} // namespace util
534} // namespace ndn