blob: cca049e548ed62603aaabb3ebf099ae1c14035ed [file] [log] [blame]
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -08002/*
Eric Newberrye345baa2018-05-23 18:17:07 -07003 * Copyright (c) 2013-2018, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -07006 *
7 * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8 *
9 * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10 * terms of the GNU Lesser General Public License as published by the Free Software
11 * Foundation, either version 3 of the License, or (at your option) any later version.
12 *
13 * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15 * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16 *
17 * You should have received copies of the GNU General Public License and GNU Lesser
18 * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19 * <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
Eric Newberrye345baa2018-05-23 18:17:07 -070022 *
23 * @author Shuo Yang
24 * @author Weiwei Liu
25 * @author Chavoosh Ghasemi
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070026 */
27
28#include "segment-fetcher.hpp"
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050029#include "../name-component.hpp"
Eric Newberrye345baa2018-05-23 18:17:07 -070030#include "../encoding/buffer-stream.hpp"
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050031#include "../lp/nack.hpp"
32#include "../lp/nack-header.hpp"
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070033
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080034#include <boost/lexical_cast.hpp>
Davide Pesavento5afbb0b2018-01-01 17:24:18 -050035#include <cmath>
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080036
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070037namespace ndn {
38namespace util {
39
Eric Newberrye345baa2018-05-23 18:17:07 -070040constexpr double SegmentFetcher::MIN_SSTHRESH;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050041
Eric Newberrye345baa2018-05-23 18:17:07 -070042void
43SegmentFetcher::Options::validate()
44{
45 if (maxTimeout < 1_ms) {
46 BOOST_THROW_EXCEPTION(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
47 }
48
49 if (initCwnd < 1.0) {
50 BOOST_THROW_EXCEPTION(std::invalid_argument("initCwnd must be greater than or equal to 1"));
51 }
52
53 if (aiStep < 0.0) {
54 BOOST_THROW_EXCEPTION(std::invalid_argument("aiStep must be greater than or equal to 0"));
55 }
56
57 if (mdCoef < 0.0 || mdCoef > 1.0) {
58 BOOST_THROW_EXCEPTION(std::invalid_argument("mdCoef must be in range [0, 1]"));
59 }
60}
61
62SegmentFetcher::SegmentFetcher(Face& face,
63 security::v2::Validator& validator,
64 const SegmentFetcher::Options& options)
65 : m_options(options)
66 , m_face(face)
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -050067 , m_scheduler(m_face.getIoService())
68 , m_validator(validator)
Eric Newberrye345baa2018-05-23 18:17:07 -070069 , m_rttEstimator(options.rttOptions)
70 , m_timeLastSegmentReceived(time::steady_clock::now())
71 , m_nextSegmentNum(0)
72 , m_cwnd(options.initCwnd)
73 , m_ssthresh(options.initSsthresh)
74 , m_nSegmentsInFlight(0)
75 , m_nSegments(0)
76 , m_highInterest(0)
77 , m_highData(0)
78 , m_recPoint(0)
79 , m_nReceived(0)
80 , m_nBytesReceived(0)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070081{
Eric Newberrye345baa2018-05-23 18:17:07 -070082 m_options.validate();
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070083}
84
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +000085shared_ptr<SegmentFetcher>
Eric Newberrycc910cd2018-05-06 17:01:40 -070086SegmentFetcher::start(Face& face,
87 const Interest& baseInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -070088 security::v2::Validator& validator,
89 const SegmentFetcher::Options& options)
Eric Newberrycc910cd2018-05-06 17:01:40 -070090{
Eric Newberrye345baa2018-05-23 18:17:07 -070091 shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
92 fetcher->fetchFirstSegment(baseInterest, false, fetcher);
Eric Newberrycc910cd2018-05-06 17:01:40 -070093 return fetcher;
94}
95
96shared_ptr<SegmentFetcher>
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -070097SegmentFetcher::fetch(Face& face,
98 const Interest& baseInterest,
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -080099 security::v2::Validator& validator,
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700100 const CompleteCallback& completeCallback,
101 const ErrorCallback& errorCallback)
102{
Eric Newberrye345baa2018-05-23 18:17:07 -0700103 Options options;
104 options.useConstantCwnd = true;
105 options.useConstantInterestTimeout = true;
106 options.maxTimeout = baseInterest.getInterestLifetime();
107 options.interestLifetime = baseInterest.getInterestLifetime();
108 shared_ptr<SegmentFetcher> fetcher = start(face, baseInterest, validator, options);
Eric Newberrycc910cd2018-05-06 17:01:40 -0700109 fetcher->onComplete.connect(completeCallback);
110 fetcher->onError.connect(errorCallback);
111 return fetcher;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500112}
113
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000114shared_ptr<SegmentFetcher>
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500115SegmentFetcher::fetch(Face& face,
116 const Interest& baseInterest,
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -0800117 shared_ptr<security::v2::Validator> validator,
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500118 const CompleteCallback& completeCallback,
119 const ErrorCallback& errorCallback)
120{
Eric Newberrycc910cd2018-05-06 17:01:40 -0700121 auto fetcher = fetch(face, baseInterest, *validator, completeCallback, errorCallback);
Eric Newberrye345baa2018-05-23 18:17:07 -0700122 // Ensure lifetime of validator shared_ptr
Eric Newberrycc910cd2018-05-06 17:01:40 -0700123 fetcher->onComplete.connect([validator] (ConstBufferPtr) {});
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000124 return fetcher;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700125}
126
127void
128SegmentFetcher::fetchFirstSegment(const Interest& baseInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -0700129 bool isRetransmission,
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500130 shared_ptr<SegmentFetcher> self)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700131{
132 Interest interest(baseInterest);
Eric Newberry2b765f82018-06-25 14:51:13 -0700133 interest.setCanBePrefix(true);
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700134 interest.setMustBeFresh(true);
Eric Newberrye345baa2018-05-23 18:17:07 -0700135 interest.setInterestLifetime(m_options.interestLifetime);
136 if (isRetransmission) {
137 interest.refreshNonce();
138 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700139
Eric Newberrye345baa2018-05-23 18:17:07 -0700140 m_nSegmentsInFlight++;
141 auto pendingInterest = m_face.expressInterest(interest,
142 bind(&SegmentFetcher::afterSegmentReceivedCb,
143 this, _1, _2, self),
144 bind(&SegmentFetcher::afterNackReceivedCb,
145 this, _1, _2, self),
146 nullptr);
147 auto timeoutEvent =
148 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
149 bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
150 if (isRetransmission) {
151 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
152 }
153 else {
154 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
155 m_pendingSegments.emplace(0, PendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
156 pendingInterest, timeoutEvent});
157 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700158}
159
160void
Eric Newberrye345baa2018-05-23 18:17:07 -0700161SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700162{
Eric Newberrye345baa2018-05-23 18:17:07 -0700163 if (checkAllSegmentsReceived()) {
164 // All segments have been retrieved
165 finalizeFetch(self);
166 }
167
168 int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
169
170 std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
171
172 while (availableWindowSize > 0) {
173 if (!m_retxQueue.empty()) {
174 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
175 m_retxQueue.pop();
176 if (pendingSegmentIt == m_pendingSegments.end()) {
177 // Skip re-requesting this segment, since it was received after RTO timeout
178 continue;
179 }
180 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
181 segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
182 }
183 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
184 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
185 // Don't request a segment a second time if received in response to first "discovery" Interest
186 m_nextSegmentNum++;
187 continue;
188 }
189 segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
190 }
191 else {
192 break;
193 }
194 availableWindowSize--;
195 }
196
197 for (const auto& segment : segmentsToRequest) {
198 Interest interest(origInterest); // to preserve Interest elements
199 interest.refreshNonce();
Eric Newberry2b765f82018-06-25 14:51:13 -0700200 interest.setCanBePrefix(false);
Eric Newberrye345baa2018-05-23 18:17:07 -0700201 interest.setMustBeFresh(false);
202
203 Name interestName(m_versionedDataName);
204 interestName.appendSegment(segment.first);
205 interest.setName(interestName);
206 interest.setInterestLifetime(m_options.interestLifetime);
207 m_nSegmentsInFlight++;
208 auto pendingInterest = m_face.expressInterest(interest,
209 bind(&SegmentFetcher::afterSegmentReceivedCb,
210 this, _1, _2, self),
211 bind(&SegmentFetcher::afterNackReceivedCb,
212 this, _1, _2, self),
213 nullptr);
214 auto timeoutEvent =
215 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
216 bind(&SegmentFetcher::afterTimeoutCb, this, interest, self));
217 if (segment.second) { // Retransmission
218 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
219 }
220 else { // First request for segment
221 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
222 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
223 time::steady_clock::now(),
224 pendingInterest, timeoutEvent});
225 m_highInterest = segment.first;
226 }
227 }
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700228}
229
230void
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000231SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -0700232 const Data& data,
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000233 shared_ptr<SegmentFetcher> self)
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700234{
Muktadir Chowdhury1c109b42018-01-10 08:36:00 +0000235 afterSegmentReceived(data);
Eric Newberrye345baa2018-05-23 18:17:07 -0700236 BOOST_ASSERT(m_nSegmentsInFlight > 0);
237 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700238
Eric Newberrye345baa2018-05-23 18:17:07 -0700239 name::Component currentSegmentComponent = data.getName().get(-1);
240 if (!currentSegmentComponent.isSegment()) {
241 return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
242 }
243
244 uint64_t currentSegment = currentSegmentComponent.toSegment();
245
246 // The first received Interest could have any segment ID
247 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
248 if (m_receivedSegments.size() > 0) {
249 pendingSegmentIt = m_pendingSegments.find(currentSegment);
250 }
251 else {
252 pendingSegmentIt = m_pendingSegments.begin();
253 }
254
255 // Cancel timeout event
256 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
257 pendingSegmentIt->second.timeoutEvent = nullptr;
258
259 m_validator.validate(data,
260 bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
261 pendingSegmentIt, self),
262 bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, self));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500263}
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700264
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500265void
Alexander Afanasyev6dfeffe2017-01-30 22:40:32 -0800266SegmentFetcher::afterValidationSuccess(const Data& data,
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500267 const Interest& origInterest,
Eric Newberrye345baa2018-05-23 18:17:07 -0700268 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500269 shared_ptr<SegmentFetcher> self)
270{
Eric Newberrye345baa2018-05-23 18:17:07 -0700271 // We update the last receive time here instead of in the segment received callback so that the
272 // transfer will not fail to terminate if we only received invalid Data packets.
273 m_timeLastSegmentReceived = time::steady_clock::now();
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500274
Eric Newberrye345baa2018-05-23 18:17:07 -0700275 m_nReceived++;
276
277 // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
278 uint64_t currentSegment = data.getName().get(-1).toSegment();
279 // Add measurement to RTO estimator (if not retransmission)
280 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
281 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
282 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
283 }
284
285 // Remove from pending segments map
286 m_pendingSegments.erase(pendingSegmentIt);
287
288 // Copy data in segment to temporary buffer
289 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
290 std::forward_as_tuple(currentSegment),
291 std::forward_as_tuple(data.getContent().value_size()));
292 std::copy(data.getContent().value_begin(), data.getContent().value_end(),
293 receivedSegmentIt.first->second.begin());
294 m_nBytesReceived += data.getContent().value_size();
295 afterSegmentValidated(data);
296
297 if (data.getFinalBlock()) {
298 if (!data.getFinalBlock()->isSegment()) {
299 return signalError(FINALBLOCKID_NOT_SEGMENT,
300 "Received FinalBlockId did not contain a segment component");
301 }
302
303 if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
304 m_nSegments = data.getFinalBlock()->toSegment() + 1;
305 cancelExcessInFlightSegments();
306 }
307 }
308
309 if (m_receivedSegments.size() == 1) {
310 m_versionedDataName = data.getName().getPrefix(-1);
311 if (currentSegment == 0) {
312 // We received the first segment in response, so we can increment the next segment number
313 m_nextSegmentNum++;
314 }
315 }
316
317 if (m_highData < currentSegment) {
318 m_highData = currentSegment;
319 }
320
321 if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
322 windowDecrease();
323 }
324 else {
325 windowIncrease();
326 }
327
328 fetchSegmentsInWindow(origInterest, self);
329}
330
331void
332SegmentFetcher::afterValidationFailure(const Data& data,
333 const security::v2::ValidationError& error,
334 shared_ptr<SegmentFetcher> self)
335{
336 signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " +
337 boost::lexical_cast<std::string>(error));
338}
339
340
341void
342SegmentFetcher::afterNackReceivedCb(const Interest& origInterest,
343 const lp::Nack& nack,
344 shared_ptr<SegmentFetcher> self)
345{
346 afterSegmentNacked();
347 BOOST_ASSERT(m_nSegmentsInFlight > 0);
348 m_nSegmentsInFlight--;
349
350 switch (nack.getReason()) {
351 case lp::NackReason::DUPLICATE:
352 case lp::NackReason::CONGESTION:
353 afterNackOrTimeout(origInterest, self);
354 break;
355 default:
356 signalError(NACK_ERROR, "Nack Error");
357 break;
358 }
359}
360
361void
362SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
363 shared_ptr<SegmentFetcher> self)
364{
365 afterSegmentTimedOut();
366 BOOST_ASSERT(m_nSegmentsInFlight > 0);
367 m_nSegmentsInFlight--;
368 afterNackOrTimeout(origInterest, self);
369}
370
371void
372SegmentFetcher::afterNackOrTimeout(const Interest& origInterest, shared_ptr<SegmentFetcher> self)
373{
374 if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
375 // Fail transfer due to exceeding the maximum timeout between the succesful receipt of segments
376 return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
377 }
378
379 name::Component lastNameComponent = origInterest.getName().get(-1);
380 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
381 BOOST_ASSERT(m_pendingSegments.size() > 0);
382 if (lastNameComponent.isSegment()) {
383 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
384 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
385 }
386 else { // First Interest
387 BOOST_ASSERT(m_pendingSegments.size() > 0);
388 pendingSegmentIt = m_pendingSegments.begin();
389 }
390
391 // Cancel timeout event and set status to InRetxQueue
392 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
393 pendingSegmentIt->second.timeoutEvent = nullptr;
394 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
395
396 m_rttEstimator.backoffRto();
397
398 if (m_receivedSegments.size() == 0) {
399 // Resend first Interest (until maximum receive timeout exceeded)
400 fetchFirstSegment(origInterest, true, self);
401 }
402 else {
403 windowDecrease();
404 m_retxQueue.push(pendingSegmentIt->first);
405 fetchSegmentsInWindow(origInterest, self);
406 }
407}
408
409void
410SegmentFetcher::finalizeFetch(shared_ptr<SegmentFetcher> self)
411{
412 // Combine segments into final buffer
413 OBufferStream buf;
414 // We may have received more segments than exist in the object.
415 BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
416
417 for (int64_t i = 0; i < m_nSegments; i++) {
418 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
419 }
420
421 onComplete(buf.buf());
422}
423
424void
425SegmentFetcher::windowIncrease()
426{
427 if (m_options.useConstantCwnd) {
428 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
429 return;
430 }
431
432 if (m_cwnd < m_ssthresh) {
433 m_cwnd += m_options.aiStep; // additive increase
434 }
435 else {
436 m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
437 }
438}
439
440void
441SegmentFetcher::windowDecrease()
442{
443 if (m_options.disableCwa || m_highData > m_recPoint) {
444 m_recPoint = m_highInterest;
445
446 if (m_options.useConstantCwnd) {
447 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
448 return;
449 }
450
451 // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
452 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
453 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
454 }
455}
456
457void
458SegmentFetcher::signalError(uint32_t code, const std::string& msg)
459{
460 // Cancel all pending Interests before signaling error
461 for (const auto& pendingSegment : m_pendingSegments) {
462 m_face.removePendingInterest(pendingSegment.second.id);
463 if (pendingSegment.second.timeoutEvent) {
464 m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
465 }
466 }
467 onError(code, msg);
468}
469
470void
471SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
472 const PendingInterestId* pendingInterest,
473 scheduler::EventId timeoutEvent)
474{
475 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
476 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
477 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
478 pendingSegmentIt->second.state = SegmentState::Retransmitted;
479 pendingSegmentIt->second.id = pendingInterest;
480 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
481}
482
483void
484SegmentFetcher::cancelExcessInFlightSegments()
485{
486 for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
487 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
488 m_face.removePendingInterest(it->second.id);
489 if (it->second.timeoutEvent) {
490 m_scheduler.cancelEvent(it->second.timeoutEvent);
491 }
492 it = m_pendingSegments.erase(it);
493 BOOST_ASSERT(m_nSegmentsInFlight > 0);
494 m_nSegmentsInFlight--;
Alexander Afanasyevf3cfab52014-08-17 22:15:25 -0700495 }
496 else {
Eric Newberrye345baa2018-05-23 18:17:07 -0700497 ++it;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500498 }
499 }
500}
501
Eric Newberrye345baa2018-05-23 18:17:07 -0700502bool
503SegmentFetcher::checkAllSegmentsReceived()
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500504{
Eric Newberrye345baa2018-05-23 18:17:07 -0700505 bool haveReceivedAllSegments = false;
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500506
Eric Newberrye345baa2018-05-23 18:17:07 -0700507 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
508 haveReceivedAllSegments = true;
509 // Verify that all segments in window have been received. If not, send Interests for missing segments.
510 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
511 if (m_receivedSegments.count(i) == 0) {
512 m_retxQueue.push(i);
513 haveReceivedAllSegments = false;
514 }
515 }
Muktadir R Chowdhury2bc2df02016-04-05 16:55:41 -0500516 }
517
Eric Newberrye345baa2018-05-23 18:17:07 -0700518 return haveReceivedAllSegments;
519}
520
521time::milliseconds
522SegmentFetcher::getEstimatedRto()
523{
524 // We don't want an Interest timeout greater than the maximum allowed timeout between the
525 // succesful receipt of segments
526 return std::min(m_options.maxTimeout,
527 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
Muktadir R Chowdhuryf58f8f42015-09-02 11:56:49 -0500528}
529
530} // namespace util
531} // namespace ndn