blob: b07c9593c248d050fabad1c84b6f7069d746c2fa [file] [log] [blame]
Andrea Tosatto672b9a72016-01-05 16:18:20 +01001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2016, Regents of the University of California,
4 * Colorado State University,
5 * University Pierre & Marie Curie, Sorbonne University.
6 *
7 * This file is part of ndn-tools (Named Data Networking Essential Tools).
8 * See AUTHORS.md for complete list of ndn-tools authors and contributors.
9 *
10 * ndn-tools is free software: you can redistribute it and/or modify it under the terms
11 * of the GNU General Public License as published by the Free Software Foundation,
12 * either version 3 of the License, or (at your option) any later version.
13 *
14 * ndn-tools is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
15 * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 * PURPOSE. See the GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along with
19 * ndn-tools, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
20 *
21 * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22 *
23 * @author Wentao Shang
24 * @author Steve DiBenedetto
25 * @author Andrea Tosatto
26 */
27
28#include "pipeline-interests.hpp"
29#include "data-fetcher.hpp"
30
31namespace ndn {
32namespace chunks {
33
34PipelineInterests::PipelineInterests(Face& face, const Options& options)
35 : m_face(face)
36 , m_nextSegmentNo(0)
37 , m_lastSegmentNo(0)
38 , m_excludeSegmentNo(0)
39 , m_options(options)
40 , m_hasFinalBlockId(false)
41 , m_hasError(false)
42 , m_hasFailure(false)
43{
44 m_segmentFetchers.resize(m_options.maxPipelineSize);
45}
46
47PipelineInterests::~PipelineInterests()
48{
49 cancel();
50}
51
52void
53PipelineInterests::runWithExcludedSegment(const Data& data, DataCallback onData,
54 FailureCallback onFailure)
55{
56 BOOST_ASSERT(onData != nullptr);
57 m_onData = std::move(onData);
58 m_onFailure = std::move(onFailure);
59
60 Name dataName = data.getName();
61 m_prefix = dataName.getPrefix(-1);
62 m_excludeSegmentNo = dataName[-1].toSegment();
63
64 if (!data.getFinalBlockId().empty()) {
65 m_hasFinalBlockId = true;
66 m_lastSegmentNo = data.getFinalBlockId().toSegment();
67 }
68
69 // if the FinalBlockId is unknown, this could potentially request non-existent segments
70 for (size_t nRequestedSegments = 0; nRequestedSegments < m_options.maxPipelineSize;
71 nRequestedSegments++) {
72 if (!fetchNextSegment(nRequestedSegments)) // all segments have been requested
73 break;
74 }
75}
76
77bool
78PipelineInterests::fetchNextSegment(std::size_t pipeNo)
79{
80 if (m_hasFailure) {
81 fail("Fetching terminated but no final segment number has been found");
82 return false;
83 }
84
85 if (m_nextSegmentNo == m_excludeSegmentNo)
86 m_nextSegmentNo++;
87
88 if (m_hasFinalBlockId && m_nextSegmentNo > m_lastSegmentNo)
89 return false;
90
91 // Send interest for next segment
92 if (m_options.isVerbose)
93 std::cerr << "Requesting segment #" << m_nextSegmentNo << std::endl;
94
95 Interest interest(Name(m_prefix).appendSegment(m_nextSegmentNo));
96 interest.setInterestLifetime(m_options.interestLifetime);
97 interest.setMustBeFresh(m_options.mustBeFresh);
98 interest.setMaxSuffixComponents(1);
99
100 BOOST_ASSERT(!m_segmentFetchers[pipeNo].first || !m_segmentFetchers[pipeNo].first->isRunning());
101
102 auto fetcher = DataFetcher::fetch(m_face, interest,
103 m_options.maxRetriesOnTimeoutOrNack,
104 m_options.maxRetriesOnTimeoutOrNack,
105 bind(&PipelineInterests::handleData, this, _1, _2, pipeNo),
106 bind(&PipelineInterests::handleFail, this, _2, pipeNo),
107 bind(&PipelineInterests::handleFail, this, _2, pipeNo),
108 m_options.isVerbose);
109
110 m_segmentFetchers[pipeNo] = make_pair(fetcher, m_nextSegmentNo);
111
112 m_nextSegmentNo++;
113 return true;
114}
115
116void
117PipelineInterests::cancel()
118{
119 for (auto& fetcher : m_segmentFetchers)
120 if (fetcher.first)
121 fetcher.first->cancel();
122
123 m_segmentFetchers.clear();
124}
125
126void
127PipelineInterests::fail(const std::string& reason)
128{
129 if (!m_hasError) {
130 cancel();
131 m_hasError = true;
132 m_hasFailure = true;
133 if (m_onFailure)
134 m_face.getIoService().post([this, reason] { m_onFailure(reason); });
135 }
136}
137
138void
139PipelineInterests::handleData(const Interest& interest, const Data& data, size_t pipeNo)
140{
141 if (m_hasError)
142 return;
143
144 BOOST_ASSERT(data.getName().equals(interest.getName()));
145
146 if (m_options.isVerbose)
147 std::cerr << "Received segment #" << data.getName()[-1].toSegment() << std::endl;
148
149 m_onData(interest, data);
150
151 if (!m_hasFinalBlockId && !data.getFinalBlockId().empty()) {
152 m_lastSegmentNo = data.getFinalBlockId().toSegment();
153 m_hasFinalBlockId = true;
154
155 for (auto& fetcher : m_segmentFetchers) {
156 if (fetcher.first && fetcher.second > m_lastSegmentNo) {
157 // Stop trying to fetch segments that are not part of the content
158 fetcher.first->cancel();
159 }
160 else if (fetcher.first && fetcher.first->hasError()) { // fetcher.second <= m_lastSegmentNo
161 // there was an error while fetching a segment that is part of the content
162 fail("Failure retriving segment #" + to_string(fetcher.second));
163 return;
164 }
165 }
166 }
167
168 fetchNextSegment(pipeNo);
169}
170
171void PipelineInterests::handleFail(const std::string& reason, std::size_t pipeNo)
172{
173 if (m_hasError)
174 return;
175
176 if (m_hasFinalBlockId && m_segmentFetchers[pipeNo].second <= m_lastSegmentNo) {
177 fail(reason);
178 }
179 else if (!m_hasFinalBlockId) {
180 // don't fetch the following segments
181 bool areAllFetchersStopped = true;
182 for (auto& fetcher : m_segmentFetchers) {
183 if (fetcher.first && fetcher.second > m_segmentFetchers[pipeNo].second) {
184 fetcher.first->cancel();
185 }
186 else if (fetcher.first && fetcher.first->isRunning()) {
187 // fetcher.second <= m_segmentFetchers[pipeNo].second
188 areAllFetchersStopped = false;
189 }
190 }
191 if (areAllFetchersStopped) {
192 if (m_onFailure)
193 fail("Fetching terminated but no final segment number has been found");
194 }
195 else {
196 m_hasFailure = true;
197 }
198 }
199}
200
201} // namespace chunks
202} // namespace ndn