blob: 56899de0f46b9a5f517ae1d9f21e86ea030897d5 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080019 */
20
Lijing Wanga697cf22016-12-25 14:44:22 -080021#include "fetcher.hpp"
22#include "fetch-manager.hpp"
23#include "core/logging.hpp"
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -080024
Lijing Wanga697cf22016-12-25 14:44:22 -080025#include <boost/asio/io_service.hpp>
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080026#include <boost/date_time/posix_time/posix_time.hpp>
Lijing Wanga697cf22016-12-25 14:44:22 -080027
28namespace ndn {
29namespace chronoshare {
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080030
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080031_LOG_INIT(Fetcher);
Alexander Afanasyev678f3502013-01-23 17:09:37 -080032
Lijing Wanga697cf22016-12-25 14:44:22 -080033Fetcher::Fetcher(Face& face,
34 const SegmentCallback& segmentCallback,
35 const FinishCallback& finishCallback,
36 const OnFetchCompleteCallback& onFetchComplete,
37 const OnFetchFailedCallback& onFetchFailed,
38 const Name& deviceName, const Name& name, int64_t minSeqNo, int64_t maxSeqNo,
39 time::milliseconds timeout, const Name& forwardingHint)
40 : m_face(face)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080041
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080042 , m_segmentCallback(segmentCallback)
43 , m_onFetchComplete(onFetchComplete)
44 , m_onFetchFailed(onFetchFailed)
45 , m_finishCallback(finishCallback)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080046
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080047 , m_active(false)
48 , m_timedwait(false)
49 , m_name(name)
50 , m_deviceName(deviceName)
51 , m_forwardingHint(forwardingHint)
52 , m_maximumNoActivityPeriod(timeout)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080053
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080054 , m_minSendSeqNo(minSeqNo - 1)
55 , m_maxInOrderRecvSeqNo(minSeqNo - 1)
Lijing Wanga697cf22016-12-25 14:44:22 -080056 // , m_minSeqNo(minSeqNo)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080057 , m_maxSeqNo(maxSeqNo)
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080058
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080059 , m_pipeline(6) // initial "congestion window"
60 , m_activePipeline(0)
Lijing Wanga697cf22016-12-25 14:44:22 -080061
62 , m_slowStart(false)
63 , m_threshold(32767) // TODO make these values dynamic
64 , m_roundCount(32767)
65
66 , m_retryPause(time::seconds::zero())
67 , m_nextScheduledRetry(time::steady_clock::now())
68
69 , m_ioService(m_face.getIoService())
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080070{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080071}
72
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080073Fetcher::~Fetcher()
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080074{
Alexander Afanasyevdfe58192013-01-17 17:34:04 -080075}
Alexander Afanasyev83531a42013-01-19 16:21:54 -080076
77void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080078Fetcher::RestartPipeline()
Alexander Afanasyev83531a42013-01-19 16:21:54 -080079{
80 m_active = true;
Alexander Afanasyev50547892013-01-19 22:03:45 -080081 m_minSendSeqNo = m_maxInOrderRecvSeqNo;
Alexander Afanasyev650ba282013-01-20 20:14:06 -080082 // cout << "Restart: " << m_minSendSeqNo << endl;
Lijing Wanga697cf22016-12-25 14:44:22 -080083 m_lastPositiveActivity = time::steady_clock::now();
Alexander Afanasyev50547892013-01-19 22:03:45 -080084
Lijing Wanga697cf22016-12-25 14:44:22 -080085 m_ioService.post(bind(&Fetcher::FillPipeline, this));
Alexander Afanasyev50547892013-01-19 22:03:45 -080086}
87
88void
Lijing Wanga697cf22016-12-25 14:44:22 -080089Fetcher::SetForwardingHint(const Name& forwardingHint)
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080090{
91 m_forwardingHint = forwardingHint;
92}
93
94void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080095Fetcher::FillPipeline()
Alexander Afanasyev50547892013-01-19 22:03:45 -080096{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080097 for (; m_minSendSeqNo < m_maxSeqNo && m_activePipeline < m_pipeline; m_minSendSeqNo++) {
Lijing Wanga697cf22016-12-25 14:44:22 -080098 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -080099
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800100 if (m_outOfOrderRecvSeqNo.find(m_minSendSeqNo + 1) != m_outOfOrderRecvSeqNo.end())
101 continue;
Alexander Afanasyev650ba282013-01-20 20:14:06 -0800102
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800103 if (m_inActivePipeline.find(m_minSendSeqNo + 1) != m_inActivePipeline.end())
104 continue;
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800105
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800106 m_inActivePipeline.insert(m_minSendSeqNo + 1);
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800107
Lijing Wanga697cf22016-12-25 14:44:22 -0800108 _LOG_DEBUG(
109 " >>> i " << Name(m_forwardingHint).append(m_name) << ", seq = " << (m_minSendSeqNo + 1));
Alexander Afanasyev678f3502013-01-23 17:09:37 -0800110
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800111 // cout << ">>> " << m_minSendSeqNo+1 << endl;
Lijing Wanga697cf22016-12-25 14:44:22 -0800112
113 Interest interest(
114 Name(m_forwardingHint).append(m_name).appendNumber(m_minSendSeqNo + 1)); // Alex: this lifetime should be changed to RTO
115 _LOG_DEBUG("interest Name: " << interest);
116 interest.setInterestLifetime(time::seconds(1));
117 m_face.expressInterest(interest,
118 bind(&Fetcher::OnData, this, m_minSendSeqNo + 1, _1, _2),
119 bind(&Fetcher::OnTimeout, this, m_minSendSeqNo + 1, _1));
120
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800121 _LOG_DEBUG(" >>> i ok");
Alexander Afanasyev50547892013-01-19 22:03:45 -0800122
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800123 m_activePipeline++;
124 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800125}
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800126void
Lijing Wanga697cf22016-12-25 14:44:22 -0800127Fetcher::OnData(uint64_t seqno, const Interest& interest, Data& data)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800128{
Lijing Wanga697cf22016-12-25 14:44:22 -0800129 const Name& name = data.getName();
130 _LOG_DEBUG(" <<< d " << name.getSubName(0, name.size() - 1) << ", seq = " << seqno);
Alexander Afanasyevddc283c2013-01-28 23:11:20 -0800131
Lijing Wanga697cf22016-12-25 14:44:22 -0800132 shared_ptr<Data> pco = make_shared<Data>(data.wireEncode());
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800133
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800134 if (m_forwardingHint == Name()) {
Zhenkai Zhud5d99be2013-03-13 19:15:56 -0700135 // TODO: check verified!!!!
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800136 if (true) {
Lijing Wanga697cf22016-12-25 14:44:22 -0800137 if (m_segmentCallback != nullptr) {
138 m_segmentCallback(m_deviceName, m_name, seqno, pco);
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800139 }
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800140 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800141 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800142 _LOG_ERROR("Can not verify signature content. Name = " << data.getName());
Zhenkai Zhu5957c0d2013-02-08 13:47:52 -0800143 // probably needs to do more in the future
144 }
Zhenkai Zhu3d1beca2013-01-23 14:55:32 -0800145 // we don't have to tell FetchManager about this
146 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800147 else {
148 // in this case we don't care whether "data" is verified, in fact, we expect it is unverified
Zhenkai Zhu79264a42013-02-07 21:49:42 -0800149
Lijing Wanga697cf22016-12-25 14:44:22 -0800150 // we need to verify this pco and apply callback only when verified
151 // TODO: check verified !!!
152 if (true) {
153 if (m_segmentCallback != nullptr) {
154 m_segmentCallback(m_deviceName, m_name, seqno, pco);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800155 }
Alexander Afanasyev66f4c492013-01-20 23:32:50 -0800156 }
Lijing Wanga697cf22016-12-25 14:44:22 -0800157 else {
158 _LOG_ERROR("Can not verify signature content. Name = " << pco->getName());
159 // probably needs to do more in the future
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800160 }
161 }
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800162
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800163 m_activePipeline--;
Lijing Wanga697cf22016-12-25 14:44:22 -0800164 m_lastPositiveActivity = time::steady_clock::now();
Alexander Afanasyev50547892013-01-19 22:03:45 -0800165
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700166 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800167 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700168 if(m_slowStart){
169 m_pipeline++;
170 if(m_pipeline == m_threshold)
171 m_slowStart = false;
172 }
173 else{
174 m_roundCount++;
175 _LOG_DEBUG ("roundCount: " << m_roundCount);
176 if(m_roundCount == m_pipeline){
177 m_pipeline++;
178 m_roundCount = 0;
179 }
180 }
181 }
182
Lijing Wanga697cf22016-12-25 14:44:22 -0800183 _LOG_DEBUG ("slowStart: " << std::boolalpha << m_slowStart << " pipeline: " << m_pipeline << " threshold: " << m_threshold);
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700184
185
Alexander Afanasyev50547892013-01-19 22:03:45 -0800186 ////////////////////////////////////////////////////////////////////////////
Lijing Wanga697cf22016-12-25 14:44:22 -0800187 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800188
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800189 m_outOfOrderRecvSeqNo.insert(seqno);
190 m_inActivePipeline.erase(seqno);
191 _LOG_DEBUG("Total segments received: " << m_outOfOrderRecvSeqNo.size());
Lijing Wanga697cf22016-12-25 14:44:22 -0800192 std::set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin();
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800193 for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end(); inOrderSeqNo++) {
194 _LOG_TRACE("Checking " << *inOrderSeqNo << " and " << m_maxInOrderRecvSeqNo + 1);
195 if (*inOrderSeqNo == m_maxInOrderRecvSeqNo + 1) {
196 m_maxInOrderRecvSeqNo = *inOrderSeqNo;
Alexander Afanasyev50547892013-01-19 22:03:45 -0800197 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800198 else if (*inOrderSeqNo < m_maxInOrderRecvSeqNo + 1) // not possible anymore, but just in case
199 {
200 continue;
201 }
202 else
203 break;
204 }
205 m_outOfOrderRecvSeqNo.erase(m_outOfOrderRecvSeqNo.begin(), inOrderSeqNo);
Alexander Afanasyev50547892013-01-19 22:03:45 -0800206 ////////////////////////////////////////////////////////////////////////////
207
Lijing Wanga697cf22016-12-25 14:44:22 -0800208 _LOG_TRACE("Max in order received: " << m_maxInOrderRecvSeqNo << ", max seqNo to request: " << m_maxSeqNo);
Alexander Afanasyevcfd5dfc2013-01-28 22:21:31 -0800209
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800210 if (m_maxInOrderRecvSeqNo == m_maxSeqNo) {
211 _LOG_TRACE("Fetch finished: " << m_name);
212 m_active = false;
213 // invoke callback
Lijing Wanga697cf22016-12-25 14:44:22 -0800214 if (m_finishCallback != nullptr) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800215 _LOG_TRACE("Notifying callback");
216 m_finishCallback(m_deviceName, m_name);
217 }
Alexander Afanasyev28ca3ed2013-01-24 23:17:15 -0800218
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800219 // tell FetchManager that we have finish our job
Lijing Wanga697cf22016-12-25 14:44:22 -0800220 // m_onFetchComplete(*this);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800221 // using executor, so we won't be deleted if there is scheduled FillPipeline call
Lijing Wanga697cf22016-12-25 14:44:22 -0800222 if (m_onFetchComplete != nullptr) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800223 m_timedwait = true;
Lijing Wanga697cf22016-12-25 14:44:22 -0800224 m_ioService.post(bind(m_onFetchComplete, std::ref(*this), m_deviceName, m_name));
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800225 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800226 }
227 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800228 m_ioService.post(bind(&Fetcher::FillPipeline, this));
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800229 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800230}
231
Zhenkai Zhuff4fa8a2013-01-28 22:02:40 -0800232void
Lijing Wanga697cf22016-12-25 14:44:22 -0800233Fetcher::OnTimeout(uint64_t seqno, const Interest& interest)
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800234{
Lijing Wanga697cf22016-12-25 14:44:22 -0800235 const Name name = interest.getName();
236 _LOG_DEBUG(" <<< :( timeout " << name.getSubName(0, name.size() - 1) << ", seq = " << seqno);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800237
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800238 // cout << "Fetcher::OnTimeout: " << name << endl;
239 // cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
240 // << ", now: " << date_time::second_clock<boost::posix_time::ptime>::universal_time()
Lijing Wanga697cf22016-12-25 14:44:22 -0800241 // << ", oldest: " <<(date_time::second_clock<boost::posix_time::ptime>::universal_time() -
242 // m_maximumNoActivityPeriod) << endl;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800243
Lijing Wanga697cf22016-12-25 14:44:22 -0800244 if (m_lastPositiveActivity <
245 (time::steady_clock::now() - m_maximumNoActivityPeriod)) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800246 bool done = false;
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800247 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800248 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800249 m_inActivePipeline.erase(seqno);
250 m_activePipeline--;
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800251
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800252 if (m_activePipeline == 0) {
253 done = true;
254 }
255 }
256
257 if (done) {
258 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800259 std::unique_lock<std::mutex> lock(m_seqNoMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800260 _LOG_DEBUG("Telling that fetch failed");
261 _LOG_DEBUG("Active pipeline size should be zero: " << m_inActivePipeline.size());
Alexander Afanasyev2ec2c382013-01-29 20:09:00 -0800262 }
263
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800264 m_active = false;
Lijing Wanga697cf22016-12-25 14:44:22 -0800265 if (m_onFetchFailed != nullptr) {
266 m_onFetchFailed(std::ref(*this));
Yingdi Yuf0b3de32013-07-11 12:59:00 -0700267 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800268 // this is not valid anymore, but we still should be able finish work
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800269 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800270 }
271 else {
272 _LOG_DEBUG("Asking to reexpress seqno: " << seqno);
Lijing Wanga697cf22016-12-25 14:44:22 -0800273 m_face.expressInterest(interest,
274 bind(&Fetcher::OnData, this, seqno, _1, _2), // TODO: correct?
275 bind(&Fetcher::OnTimeout, this, seqno, _1)); // TODO: correct?
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800276 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800277}
Lijing Wanga697cf22016-12-25 14:44:22 -0800278
279} // namespace chronoshare
280} // namespace ndn