blob: 71799dd9b3488866405ae2de992d4cd429380891 [file] [log] [blame]
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -08002/*
Eric Newberrye345baa2018-05-23 18:17:07 -07003 * Copyright (c) 2013-2018, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07006 *
7 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8 *
9 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10 * terms of the GNU Lesser General Public License as published by the Free Software
11 * Foundation, either version 3 of the License, or (at your option) any later version.
12 *
13 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16 *
17 * You should have received copies of the GNU General Public License and GNU Lesser
18 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19 * <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Eric Newberrye345baa2018-05-23 18:17:07 -070022 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
25 * @author Chavoosh Ghasemi
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070026 */
27
Davide Pesavento7e780642018-11-24 15:51:34 -050028#include "ndn-cxx/util/segment-fetcher.hpp"
29#include "ndn-cxx/name-component.hpp"
30#include "ndn-cxx/encoding/buffer-stream.hpp"
31#include "ndn-cxx/lp/nack.hpp"
32#include "ndn-cxx/lp/nack-header.hpp"
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070033
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050034#include <boost/asio/io_service.hpp>
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080035#include <boost/lexical_cast.hpp>
Davide Pesavento5afbb0b2018-01-01 17:24:18 -050036#include <cmath>
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080037
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070038namespace ndn {
39namespace util {
40
Eric Newberrye345baa2018-05-23 18:17:07 -070041constexpr double SegmentFetcher::MIN_SSTHRESH;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050042
Eric Newberrye345baa2018-05-23 18:17:07 -070043void
44SegmentFetcher::Options::validate()
45{
46 if (maxTimeout < 1_ms) {
47 BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
48 }
49
50 if (initCwnd < 1.0) {
51 BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
52 }
53
54 if (aiStep < 0.0) {
55 BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
56 }
57
58 if (mdCoef < 0.0 || mdCoef > 1.0) {
59 BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
60 }
61}
62
63SegmentFetcher::SegmentFetcher(Face& face,
64 security::v2::Validator& validator,
65 const SegmentFetcher::Options& options)
66 : m_options(options)
67 , m_face(face)
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050068 , m_scheduler(m_face.getIoService())
69 , m_validator(validator)
Eric Newberrye345baa2018-05-23 18:17:07 -070070 , m_rttEstimator(options.rttOptions)
71 , m_timeLastSegmentReceived(time::steady_clock::now())
72 , m_nextSegmentNum(0)
73 , m_cwnd(options.initCwnd)
74 , m_ssthresh(options.initSsthresh)
75 , m_nSegmentsInFlight(0)
76 , m_nSegments(0)
77 , m_highInterest(0)
78 , m_highData(0)
79 , m_recPoint(0)
80 , m_nReceived(0)
81 , m_nBytesReceived(0)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070082{
Eric Newberrye345baa2018-05-23 18:17:07 -070083 m_options.validate();
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070084}
85
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +000086shared_ptr<SegmentFetcher>
Eric Newberrycc910cd2018-05-06 17:01:40 -070087SegmentFetcher::start(Face& face,
88 const Interest& baseInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -070089 security::v2::Validator& validator,
90 const SegmentFetcher::Options& options)
Eric Newberrycc910cd2018-05-06 17:01:40 -070091{
Eric Newberrye345baa2018-05-23 18:17:07 -070092 shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050093 fetcher->m_this = fetcher;
94 fetcher->fetchFirstSegment(baseInterest, false);
Eric Newberrycc910cd2018-05-06 17:01:40 -070095 return fetcher;
96}
97
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070098void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -050099SegmentFetcher::stop()
100{
101 if (!m_this) {
102 return;
103 }
104
105 for (const auto& pendingSegment : m_pendingSegments) {
106 m_face.removePendingInterest(pendingSegment.second.id);
107 if (pendingSegment.second.timeoutEvent) {
108 m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
109 }
110 }
111 m_face.getIoService().post([self = std::move(m_this)] {});
112}
113
114bool
115SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
116{
117 auto self = weakSelf.lock();
118 return self == nullptr || self->m_this == nullptr;
119}
120
121void
122SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700123{
124 Interest interest(baseInterest);
Eric Newberry2b765f82018-06-25 14:51:13 -0700125 interest.setCanBePrefix(true);
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700126 interest.setMustBeFresh(true);
Eric Newberrye345baa2018-05-23 18:17:07 -0700127 interest.setInterestLifetime(m_options.interestLifetime);
128 if (isRetransmission) {
129 interest.refreshNonce();
130 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700131
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500132 weak_ptr<SegmentFetcher> weakSelf = m_this;
133
Eric Newberrye345baa2018-05-23 18:17:07 -0700134 m_nSegmentsInFlight++;
135 auto pendingInterest = m_face.expressInterest(interest,
136 bind(&SegmentFetcher::afterSegmentReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500137 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700138 bind(&SegmentFetcher::afterNackReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500139 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700140 nullptr);
141 auto timeoutEvent =
142 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500143 bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
144
Eric Newberrye345baa2018-05-23 18:17:07 -0700145 if (isRetransmission) {
146 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
147 }
148 else {
149 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
150 m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
151 pendingInterest, timeoutEvent});
152 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700153}
154
155void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500156SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700157{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500158 weak_ptr<SegmentFetcher> weakSelf = m_this;
159
Eric Newberrye345baa2018-05-23 18:17:07 -0700160 if (checkAllSegmentsReceived()) {
161 // All segments have been retrieved
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500162 return finalizeFetch();
Eric Newberrye345baa2018-05-23 18:17:07 -0700163 }
164
165 int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
Eric Newberrye345baa2018-05-23 18:17:07 -0700166 std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
167
168 while (availableWindowSize > 0) {
169 if (!m_retxQueue.empty()) {
170 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
171 m_retxQueue.pop();
172 if (pendingSegmentIt == m_pendingSegments.end()) {
173 // Skip re-requesting this segment, since it was received after RTO timeout
174 continue;
175 }
176 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
177 segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
178 }
179 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
180 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
181 // Don't request a segment a second time if received in response to first "discovery" Interest
182 m_nextSegmentNum++;
183 continue;
184 }
185 segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
186 }
187 else {
188 break;
189 }
190 availableWindowSize--;
191 }
192
193 for (const auto& segment : segmentsToRequest) {
194 Interest interest(origInterest); // to preserve Interest elements
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500195 interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
Eric Newberry2b765f82018-06-25 14:51:13 -0700196 interest.setCanBePrefix(false);
Eric Newberrye345baa2018-05-23 18:17:07 -0700197 interest.setMustBeFresh(false);
Eric Newberrye345baa2018-05-23 18:17:07 -0700198 interest.setInterestLifetime(m_options.interestLifetime);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500199 interest.refreshNonce();
200
Eric Newberrye345baa2018-05-23 18:17:07 -0700201 m_nSegmentsInFlight++;
202 auto pendingInterest = m_face.expressInterest(interest,
203 bind(&SegmentFetcher::afterSegmentReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500204 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700205 bind(&SegmentFetcher::afterNackReceivedCb,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500206 this, _1, _2, weakSelf),
Eric Newberrye345baa2018-05-23 18:17:07 -0700207 nullptr);
208 auto timeoutEvent =
209 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500210 bind(&SegmentFetcher::afterTimeoutCb, this, interest, weakSelf));
211
Eric Newberrye345baa2018-05-23 18:17:07 -0700212 if (segment.second) { // Retransmission
213 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
214 }
215 else { // First request for segment
216 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
217 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
218 time::steady_clock::now(),
219 pendingInterest, timeoutEvent});
220 m_highInterest = segment.first;
221 }
222 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700223}
224
225void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500226SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
227 const weak_ptr<SegmentFetcher>& weakSelf)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700228{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500229 if (shouldStop(weakSelf))
230 return;
231
Eric Newberrye345baa2018-05-23 18:17:07 -0700232 BOOST_ASSERT(m_nSegmentsInFlight > 0);
233 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700234
Eric Newberrye345baa2018-05-23 18:17:07 -0700235 name::Component currentSegmentComponent = data.getName().get(-1);
236 if (!currentSegmentComponent.isSegment()) {
237 return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
238 }
239
240 uint64_t currentSegment = currentSegmentComponent.toSegment();
241
242 // The first received Interest could have any segment ID
243 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
244 if (m_receivedSegments.size() > 0) {
245 pendingSegmentIt = m_pendingSegments.find(currentSegment);
246 }
247 else {
248 pendingSegmentIt = m_pendingSegments.begin();
249 }
250
Ashlesh Gawandeebe156c2018-11-12 16:27:49 -0600251 if (pendingSegmentIt == m_pendingSegments.end()) {
252 return;
253 }
254
255 afterSegmentReceived(data);
256
Eric Newberrye345baa2018-05-23 18:17:07 -0700257 // Cancel timeout event
258 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
259 pendingSegmentIt->second.timeoutEvent = nullptr;
260
261 m_validator.validate(data,
262 bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500263 pendingSegmentIt, weakSelf),
264 bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500265}
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700266
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500267void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500268SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -0700269 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500270 const weak_ptr<SegmentFetcher>& weakSelf)
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500271{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500272 if (shouldStop(weakSelf))
273 return;
274
Eric Newberrye345baa2018-05-23 18:17:07 -0700275 // We update the last receive time here instead of in the segment received callback so that the
276 // transfer will not fail to terminate if we only received invalid Data packets.
277 m_timeLastSegmentReceived = time::steady_clock::now();
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500278
Eric Newberrye345baa2018-05-23 18:17:07 -0700279 m_nReceived++;
280
281 // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
282 uint64_t currentSegment = data.getName().get(-1).toSegment();
283 // Add measurement to RTO estimator (if not retransmission)
284 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
285 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
286 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
287 }
288
289 // Remove from pending segments map
290 m_pendingSegments.erase(pendingSegmentIt);
291
292 // Copy data in segment to temporary buffer
293 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
294 std::forward_as_tuple(currentSegment),
295 std::forward_as_tuple(data.getContent().value_size()));
296 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
297 receivedSegmentIt.first->second.begin());
298 m_nBytesReceived += data.getContent().value_size();
299 afterSegmentValidated(data);
300
301 if (data.getFinalBlock()) {
302 if (!data.getFinalBlock()->isSegment()) {
303 return signalError(FINALBLOCKID_NOT_SEGMENT,
304 "Received FinalBlockId did not contain a segment component");
305 }
306
307 if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
308 m_nSegments = data.getFinalBlock()->toSegment() + 1;
309 cancelExcessInFlightSegments();
310 }
311 }
312
313 if (m_receivedSegments.size() == 1) {
314 m_versionedDataName = data.getName().getPrefix(-1);
315 if (currentSegment == 0) {
316 // We received the first segment in response, so we can increment the next segment number
317 m_nextSegmentNum++;
318 }
319 }
320
321 if (m_highData < currentSegment) {
322 m_highData = currentSegment;
323 }
324
325 if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
326 windowDecrease();
327 }
328 else {
329 windowIncrease();
330 }
331
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500332 fetchSegmentsInWindow(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700333}
334
335void
336SegmentFetcher::afterValidationFailure(const Data& data,
337 const security::v2::ValidationError& error,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500338 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700339{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500340 if (shouldStop(weakSelf))
341 return;
342
343 signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
Eric Newberrye345baa2018-05-23 18:17:07 -0700344}
345
Eric Newberrye345baa2018-05-23 18:17:07 -0700346void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500347SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
348 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700349{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500350 if (shouldStop(weakSelf))
351 return;
352
Eric Newberrye345baa2018-05-23 18:17:07 -0700353 afterSegmentNacked();
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500354
Eric Newberrye345baa2018-05-23 18:17:07 -0700355 BOOST_ASSERT(m_nSegmentsInFlight > 0);
356 m_nSegmentsInFlight--;
357
358 switch (nack.getReason()) {
359 case lp::NackReason::DUPLICATE:
360 case lp::NackReason::CONGESTION:
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500361 afterNackOrTimeout(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700362 break;
363 default:
364 signalError(NACK_ERROR, "Nack Error");
365 break;
366 }
367}
368
369void
370SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500371 const weak_ptr<SegmentFetcher>& weakSelf)
Eric Newberrye345baa2018-05-23 18:17:07 -0700372{
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500373 if (shouldStop(weakSelf))
374 return;
375
Eric Newberrye345baa2018-05-23 18:17:07 -0700376 afterSegmentTimedOut();
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500377
Eric Newberrye345baa2018-05-23 18:17:07 -0700378 BOOST_ASSERT(m_nSegmentsInFlight > 0);
379 m_nSegmentsInFlight--;
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500380 afterNackOrTimeout(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700381}
382
383void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500384SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
Eric Newberrye345baa2018-05-23 18:17:07 -0700385{
386 if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500387 // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
Eric Newberrye345baa2018-05-23 18:17:07 -0700388 return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
389 }
390
391 name::Component lastNameComponent = origInterest.getName().get(-1);
392 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
393 BOOST_ASSERT(m_pendingSegments.size() > 0);
394 if (lastNameComponent.isSegment()) {
395 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
396 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
397 }
398 else { // First Interest
399 BOOST_ASSERT(m_pendingSegments.size() > 0);
400 pendingSegmentIt = m_pendingSegments.begin();
401 }
402
403 // Cancel timeout event and set status to InRetxQueue
404 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
405 pendingSegmentIt->second.timeoutEvent = nullptr;
406 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
407
408 m_rttEstimator.backoffRto();
409
410 if (m_receivedSegments.size() == 0) {
411 // Resend first Interest (until maximum receive timeout exceeded)
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500412 fetchFirstSegment(origInterest, true);
Eric Newberrye345baa2018-05-23 18:17:07 -0700413 }
414 else {
415 windowDecrease();
416 m_retxQueue.push(pendingSegmentIt->first);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500417 fetchSegmentsInWindow(origInterest);
Eric Newberrye345baa2018-05-23 18:17:07 -0700418 }
419}
420
421void
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500422SegmentFetcher::finalizeFetch()
Eric Newberrye345baa2018-05-23 18:17:07 -0700423{
424 // Combine segments into final buffer
425 OBufferStream buf;
426 // We may have received more segments than exist in the object.
427 BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
428
429 for (int64_t i = 0; i < m_nSegments; i++) {
430 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
431 }
432
433 onComplete(buf.buf());
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500434 stop();
Eric Newberrye345baa2018-05-23 18:17:07 -0700435}
436
437void
438SegmentFetcher::windowIncrease()
439{
440 if (m_options.useConstantCwnd) {
441 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
442 return;
443 }
444
445 if (m_cwnd < m_ssthresh) {
446 m_cwnd += m_options.aiStep; // additive increase
447 }
448 else {
449 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
450 }
451}
452
453void
454SegmentFetcher::windowDecrease()
455{
456 if (m_options.disableCwa || m_highData > m_recPoint) {
457 m_recPoint = m_highInterest;
458
459 if (m_options.useConstantCwnd) {
460 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
461 return;
462 }
463
464 // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
465 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
466 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
467 }
468}
469
470void
471SegmentFetcher::signalError(uint32_t code, const std::string& msg)
472{
Eric Newberrye345baa2018-05-23 18:17:07 -0700473 onError(code, msg);
Ashlesh Gawande679dbb02018-08-21 11:43:21 -0500474 stop();
Eric Newberrye345baa2018-05-23 18:17:07 -0700475}
476
477void
478SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
479 const PendingInterestId* pendingInterest,
480 scheduler::EventId timeoutEvent)
481{
482 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
483 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
484 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
485 pendingSegmentIt->second.state = SegmentState::Retransmitted;
Ashlesh Gawandeebe156c2018-11-12 16:27:49 -0600486 m_face.removePendingInterest(pendingSegmentIt->second.id);
Eric Newberrye345baa2018-05-23 18:17:07 -0700487 pendingSegmentIt->second.id = pendingInterest;
488 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
489}
490
491void
492SegmentFetcher::cancelExcessInFlightSegments()
493{
494 for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
495 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
496 m_face.removePendingInterest(it->second.id);
497 if (it->second.timeoutEvent) {
498 m_scheduler.cancelEvent(it->second.timeoutEvent);
499 }
500 it = m_pendingSegments.erase(it);
501 BOOST_ASSERT(m_nSegmentsInFlight > 0);
502 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700503 }
504 else {
Eric Newberrye345baa2018-05-23 18:17:07 -0700505 ++it;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500506 }
507 }
508}
509
Eric Newberrye345baa2018-05-23 18:17:07 -0700510bool
511SegmentFetcher::checkAllSegmentsReceived()
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500512{
Eric Newberrye345baa2018-05-23 18:17:07 -0700513 bool haveReceivedAllSegments = false;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500514
Eric Newberrye345baa2018-05-23 18:17:07 -0700515 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
516 haveReceivedAllSegments = true;
517 // Verify that all segments in window have been received. If not, send Interests for missing segments.
518 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
519 if (m_receivedSegments.count(i) == 0) {
520 m_retxQueue.push(i);
521 haveReceivedAllSegments = false;
522 }
523 }
Muktadir R Chowdhury2bc2df02016-04-05 16:55:41 -0500524 }
525
Eric Newberrye345baa2018-05-23 18:17:07 -0700526 return haveReceivedAllSegments;
527}
528
529time::milliseconds
530SegmentFetcher::getEstimatedRto()
531{
532 // We don't want an Interest timeout greater than the maximum allowed timeout between the
533 // succesful receipt of segments
534 return std::min(m_options.maxTimeout,
535 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500536}
537
538} // namespace util
539} // namespace ndn