blob: 5a11074fc62774aceba0b045132fbfa777651d77 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080019 */
20
Lijing Wanga697cf22016-12-25 14:44:22 -080021#include "fetcher.hpp"
22#include "fetch-manager.hpp"
23#include "core/logging.hpp"
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -080024
Lijing Wanga697cf22016-12-25 14:44:22 -080025#include <boost/asio/io_service.hpp>
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080026#include <boost/date_time/posix_time/posix_time.hpp>
Lijing Wanga697cf22016-12-25 14:44:22 -080027
28namespace ndn {
29namespace chronoshare {
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080030
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080031_LOG_INIT(Fetcher);
Alexander Afanasyev678f3502013-01-23 17:09:37 -080032
Lijing Wanga697cf22016-12-25 14:44:22 -080033Fetcher::Fetcher(Face& face,
Alexander Afanasyevc91fc332017-02-10 17:44:41 -080034 bool isSegment,
Lijing Wanga697cf22016-12-25 14:44:22 -080035 const SegmentCallback& segmentCallback,
36 const FinishCallback& finishCallback,
37 const OnFetchCompleteCallback& onFetchComplete,
38 const OnFetchFailedCallback& onFetchFailed,
39 const Name& deviceName, const Name& name, int64_t minSeqNo, int64_t maxSeqNo,
40 time::milliseconds timeout, const Name& forwardingHint)
41 : m_face(face)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080042
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080043 , m_segmentCallback(segmentCallback)
44 , m_onFetchComplete(onFetchComplete)
45 , m_onFetchFailed(onFetchFailed)
46 , m_finishCallback(finishCallback)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080047
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080048 , m_active(false)
49 , m_timedwait(false)
50 , m_name(name)
51 , m_deviceName(deviceName)
52 , m_forwardingHint(forwardingHint)
53 , m_maximumNoActivityPeriod(timeout)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080054
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080055 , m_minSendSeqNo(minSeqNo - 1)
56 , m_maxInOrderRecvSeqNo(minSeqNo - 1)
Lijing Wanga697cf22016-12-25 14:44:22 -080057 // , m_minSeqNo(minSeqNo)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080058 , m_maxSeqNo(maxSeqNo)
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080059
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080060 , m_pipeline(6) // initial "congestion window"
61 , m_activePipeline(0)
Lijing Wanga697cf22016-12-25 14:44:22 -080062
63 , m_slowStart(false)
64 , m_threshold(32767) // TODO make these values dynamic
65 , m_roundCount(32767)
66
67 , m_retryPause(time::seconds::zero())
68 , m_nextScheduledRetry(time::steady_clock::now())
69
70 , m_ioService(m_face.getIoService())
Alexander Afanasyevc91fc332017-02-10 17:44:41 -080071 , m_isSegment(isSegment)
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080072{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080073}
74
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080075Fetcher::~Fetcher()
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080076{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080077}
Alexander Afanasyev83531a42013-01-19 16:21:54 -080078
79void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080080Fetcher::RestartPipeline()
Alexander Afanasyev83531a42013-01-19 16:21:54 -080081{
82 m_active = true;
Alexander Afanasyev50547892013-01-19 22:03:45 -080083 m_minSendSeqNo = m_maxInOrderRecvSeqNo;
Alexander Afanasyev650ba282013-01-20 20:14:06 -080084 // cout << "Restart: " << m_minSendSeqNo << endl;
Lijing Wanga697cf22016-12-25 14:44:22 -080085 m_lastPositiveActivity = time::steady_clock::now();
Alexander Afanasyev50547892013-01-19 22:03:45 -080086
Lijing Wanga697cf22016-12-25 14:44:22 -080087 m_ioService.post(bind(&Fetcher::FillPipeline, this));
Alexander Afanasyev50547892013-01-19 22:03:45 -080088}
89
90void
Lijing Wanga697cf22016-12-25 14:44:22 -080091Fetcher::SetForwardingHint(const Name& forwardingHint)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080092{
93 m_forwardingHint = forwardingHint;
94}
95
96void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080097Fetcher::FillPipeline()
Alexander Afanasyev50547892013-01-19 22:03:45 -080098{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080099 for (; m_minSendSeqNo < m_maxSeqNo && m_activePipeline < m_pipeline; m_minSendSeqNo++) {
Lijing Wanga697cf22016-12-25 14:44:22 -0800100 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800101
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800102 if (m_outOfOrderRecvSeqNo.find(m_minSendSeqNo + 1) != m_outOfOrderRecvSeqNo.end())
103 continue;
Alexander Afanasyev650ba282013-01-20 20:14:06 -0800104
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800105 if (m_inActivePipeline.find(m_minSendSeqNo + 1) != m_inActivePipeline.end())
106 continue;
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800107
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800108 m_inActivePipeline.insert(m_minSendSeqNo + 1);
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800109
Lijing Wanga697cf22016-12-25 14:44:22 -0800110 _LOG_DEBUG(
111 " >>> i " << Name(m_forwardingHint).append(m_name) << ", seq = " << (m_minSendSeqNo + 1));
Alexander Afanasyev678f3502013-01-23 17:09:37 -0800112
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800113 // cout << ">>> " << m_minSendSeqNo+1 << endl;
Lijing Wanga697cf22016-12-25 14:44:22 -0800114
Alexander Afanasyevc91fc332017-02-10 17:44:41 -0800115 Name name = Name(m_forwardingHint).append(m_name);
116 if (m_isSegment) {
117 name.appendSegment(m_minSendSeqNo + 1);
118 }
119 else {
120 name.appendNumber(m_minSendSeqNo + 1);
121 }
122 Interest interest(name);
123 interest.setInterestLifetime(time::seconds(1)); // Alex: this lifetime should be changed to RTO
124 _LOG_DEBUG("interest: " << interest);
Lijing Wanga697cf22016-12-25 14:44:22 -0800125 m_face.expressInterest(interest,
126 bind(&Fetcher::OnData, this, m_minSendSeqNo + 1, _1, _2),
127 bind(&Fetcher::OnTimeout, this, m_minSendSeqNo + 1, _1));
128
Alexander Afanasyevc91fc332017-02-10 17:44:41 -0800129 _LOG_TRACE(" >>> i ok");
Alexander Afanasyev50547892013-01-19 22:03:45 -0800130
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800131 m_activePipeline++;
132 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800133}
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800134void
Lijing Wanga697cf22016-12-25 14:44:22 -0800135Fetcher::OnData(uint64_t seqno, const Interest& interest, Data& data)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800136{
Lijing Wanga697cf22016-12-25 14:44:22 -0800137 const Name& name = data.getName();
138 _LOG_DEBUG(" <<< d " << name.getSubName(0, name.size() - 1) << ", seq = " << seqno);
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800139
Lijing Wanga697cf22016-12-25 14:44:22 -0800140 shared_ptr<Data> pco = make_shared<Data>(data.wireEncode());
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800141
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800142 if (m_forwardingHint == Name()) {
Zhenkai Zhud5d99be2013-03-13 19:15:56 -0700143 // TODO: check verified!!!!
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800144 if (true) {
Lijing Wanga697cf22016-12-25 14:44:22 -0800145 if (m_segmentCallback != nullptr) {
146 m_segmentCallback(m_deviceName, m_name, seqno, pco);
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800147 }
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800148 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800149 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800150 _LOG_ERROR("Can not verify signature content. Name = " << data.getName());
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800151 // probably needs to do more in the future
152 }
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800153 // we don't have to tell FetchManager about this
154 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800155 else {
156 // in this case we don't care whether "data" is verified, in fact, we expect it is unverified
Zhenkai Zhu79264a42013-02-07 21:49:42 -0800157
Lijing Wanga697cf22016-12-25 14:44:22 -0800158 // we need to verify this pco and apply callback only when verified
159 // TODO: check verified !!!
160 if (true) {
161 if (m_segmentCallback != nullptr) {
162 m_segmentCallback(m_deviceName, m_name, seqno, pco);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800163 }
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800164 }
Lijing Wanga697cf22016-12-25 14:44:22 -0800165 else {
166 _LOG_ERROR("Can not verify signature content. Name = " << pco->getName());
167 // probably needs to do more in the future
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800168 }
169 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800170
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800171 m_activePipeline--;
Lijing Wanga697cf22016-12-25 14:44:22 -0800172 m_lastPositiveActivity = time::steady_clock::now();
Alexander Afanasyev50547892013-01-19 22:03:45 -0800173
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700174 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800175 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700176 if(m_slowStart){
177 m_pipeline++;
178 if(m_pipeline == m_threshold)
179 m_slowStart = false;
180 }
181 else{
182 m_roundCount++;
183 _LOG_DEBUG ("roundCount: " << m_roundCount);
184 if(m_roundCount == m_pipeline){
185 m_pipeline++;
186 m_roundCount = 0;
187 }
188 }
189 }
190
Lijing Wanga697cf22016-12-25 14:44:22 -0800191 _LOG_DEBUG ("slowStart: " << std::boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700192
193
Alexander Afanasyev50547892013-01-19 22:03:45 -0800194 ////////////////////////////////////////////////////////////////////////////
Lijing Wanga697cf22016-12-25 14:44:22 -0800195 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800196
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800197 m_outOfOrderRecvSeqNo.insert(seqno);
198 m_inActivePipeline.erase(seqno);
199 _LOG_DEBUG("Total segments received: " << m_outOfOrderRecvSeqNo.size());
Lijing Wanga697cf22016-12-25 14:44:22 -0800200 std::set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin();
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800201 for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end(); inOrderSeqNo++) {
202 _LOG_TRACE("Checking " << *inOrderSeqNo << " and " << m_maxInOrderRecvSeqNo + 1);
203 if (*inOrderSeqNo == m_maxInOrderRecvSeqNo + 1) {
204 m_maxInOrderRecvSeqNo = *inOrderSeqNo;
Alexander Afanasyev50547892013-01-19 22:03:45 -0800205 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800206 else if (*inOrderSeqNo < m_maxInOrderRecvSeqNo + 1) // not possible anymore, but just in case
207 {
208 continue;
209 }
210 else
211 break;
212 }
213 m_outOfOrderRecvSeqNo.erase(m_outOfOrderRecvSeqNo.begin(), inOrderSeqNo);
Alexander Afanasyev50547892013-01-19 22:03:45 -0800214 ////////////////////////////////////////////////////////////////////////////
215
Lijing Wanga697cf22016-12-25 14:44:22 -0800216 _LOG_TRACE("Max in order received: " << m_maxInOrderRecvSeqNo << ", max seqNo to request: " << m_maxSeqNo);
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800217
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800218 if (m_maxInOrderRecvSeqNo == m_maxSeqNo) {
219 _LOG_TRACE("Fetch finished: " << m_name);
220 m_active = false;
221 // invoke callback
Lijing Wanga697cf22016-12-25 14:44:22 -0800222 if (m_finishCallback != nullptr) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800223 _LOG_TRACE("Notifying callback");
224 m_finishCallback(m_deviceName, m_name);
225 }
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800226
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800227 // tell FetchManager that we have finish our job
Lijing Wanga697cf22016-12-25 14:44:22 -0800228 // m_onFetchComplete(*this);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800229 // using executor, so we won't be deleted if there is scheduled FillPipeline call
Lijing Wanga697cf22016-12-25 14:44:22 -0800230 if (m_onFetchComplete != nullptr) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800231 m_timedwait = true;
Lijing Wanga697cf22016-12-25 14:44:22 -0800232 m_ioService.post(bind(m_onFetchComplete, std::ref(*this), m_deviceName, m_name));
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800233 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800234 }
235 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800236 m_ioService.post(bind(&Fetcher::FillPipeline, this));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800237 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800238}
239
Zhenkai Zhuff4fa8a2013-01-28 22:02:40 -0800240void
Lijing Wanga697cf22016-12-25 14:44:22 -0800241Fetcher::OnTimeout(uint64_t seqno, const Interest& interest)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800242{
Lijing Wanga697cf22016-12-25 14:44:22 -0800243 const Name name = interest.getName();
244 _LOG_DEBUG(" <<< :( timeout " << name.getSubName(0, name.size() - 1) << ", seq = " << seqno);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800245
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800246 // cout << "Fetcher::OnTimeout: " << name << endl;
247 // cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
248 // << ", now: " << date_time::second_clock<boost::posix_time::ptime>::universal_time()
Lijing Wanga697cf22016-12-25 14:44:22 -0800249 // << ", oldest: " <<(date_time::second_clock<boost::posix_time::ptime>::universal_time() -
250 // m_maximumNoActivityPeriod) << endl;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800251
Lijing Wanga697cf22016-12-25 14:44:22 -0800252 if (m_lastPositiveActivity <
253 (time::steady_clock::now() - m_maximumNoActivityPeriod)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800254 bool done = false;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800255 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800256 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800257 m_inActivePipeline.erase(seqno);
258 m_activePipeline--;
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800259
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800260 if (m_activePipeline == 0) {
261 done = true;
262 }
263 }
264
265 if (done) {
266 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800267 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800268 _LOG_DEBUG("Telling that fetch failed");
269 _LOG_DEBUG("Active pipeline size should be zero: " << m_inActivePipeline.size());
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800270 }
271
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800272 m_active = false;
Lijing Wanga697cf22016-12-25 14:44:22 -0800273 if (m_onFetchFailed != nullptr) {
274 m_onFetchFailed(std::ref(*this));
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700275 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800276 // this is not valid anymore, but we still should be able finish work
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800277 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800278 }
279 else {
280 _LOG_DEBUG("Asking to reexpress seqno: " << seqno);
Lijing Wanga697cf22016-12-25 14:44:22 -0800281 m_face.expressInterest(interest,
282 bind(&Fetcher::OnData, this, seqno, _1, _2), // TODO: correct?
283 bind(&Fetcher::OnTimeout, this, seqno, _1)); // TODO: correct?
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800284 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800285}
Lijing Wanga697cf22016-12-25 14:44:22 -0800286
287} // namespace chronoshare
288} // namespace ndn