blob: d8dd6a6a4dea7d53d2104412db6938cd924130b9 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080019 */
20
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080021#include "fetch-manager.hpp"
Lijing Wanga697cf22016-12-25 14:44:22 -080022#include "core/logging.hpp"
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080023
Lijing Wanga697cf22016-12-25 14:44:22 -080024#include <ndn-cxx/face.hpp>
25
26namespace ndn {
27namespace chronoshare {
Alexander Afanasyevfc720362013-01-24 21:49:48 -080028
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080029_LOG_INIT(FetchManager);
Alexander Afanasyev49a18522013-01-18 17:49:04 -080030
Lijing Wanga697cf22016-12-25 14:44:22 -080031// The disposer object function
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080032struct fetcher_disposer
33{
34 void
35 operator()(Fetcher* delete_this)
36 {
37 delete delete_this;
38 }
39};
Alexander Afanasyev83531a42013-01-19 16:21:54 -080040
Lijing Wanga697cf22016-12-25 14:44:22 -080041FetchManager::FetchManager(Face& face, const Mapping& mapping, const Name& broadcastForwardingHint,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080042 uint32_t parallelFetches, // = 3
43 const SegmentCallback& defaultSegmentCallback,
Lijing Wanga697cf22016-12-25 14:44:22 -080044 const FinishCallback& defaultFinishCallback, const FetchTaskDbPtr& taskDb)
45 : m_face(face)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080046 , m_mapping(mapping)
47 , m_maxParallelFetches(parallelFetches)
48 , m_currentParallelFetches(0)
Lijing Wanga697cf22016-12-25 14:44:22 -080049 , m_scheduler(m_face.getIoService())
50 , m_scheduledFetchesEvent(m_scheduler)
Zhenkai Zhua0147382013-01-29 15:57:27 -080051 , m_defaultSegmentCallback(defaultSegmentCallback)
52 , m_defaultFinishCallback(defaultFinishCallback)
Zhenkai Zhuda686882013-01-29 22:32:24 -080053 , m_taskDb(taskDb)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080054 , m_broadcastHint(broadcastForwardingHint)
Lijing Wanga697cf22016-12-25 14:44:22 -080055 , m_ioService(m_face.getIoService())
Alexander Afanasyev8811b352013-01-02 12:51:15 -080056{
Lijing Wanga697cf22016-12-25 14:44:22 -080057 // no need to check to often. if needed, will be rescheduled
58 m_scheduledFetchesEvent =
59 m_scheduler.scheduleEvent(time::seconds(300), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080060
Zhenkai Zhuda686882013-01-29 22:32:24 -080061 // resume un-finished fetches if there is any
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080062 if (m_taskDb) {
Lijing Wanga697cf22016-12-25 14:44:22 -080063 m_taskDb->foreachTask(
64 [this](const Name& deviceName, const Name& baseName, uint64_t minSeqNo, uint64_t maxSeqNo,
65 int priority) { this->Enqueue(deviceName, baseName, minSeqNo, maxSeqNo, priority); });
Zhenkai Zhuda686882013-01-29 22:32:24 -080066 }
Alexander Afanasyev49a18522013-01-18 17:49:04 -080067}
Alexander Afanasyeva199f972013-01-02 19:37:26 -080068
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080069FetchManager::~FetchManager()
Alexander Afanasyev49a18522013-01-18 17:49:04 -080070{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080071 m_fetchList.clear_and_dispose(fetcher_disposer());
Alexander Afanasyev49a18522013-01-18 17:49:04 -080072}
Alexander Afanasyev8811b352013-01-02 12:51:15 -080073
Zhenkai Zhua0147382013-01-29 15:57:27 -080074// Enqueue using default callbacks
75void
Lijing Wanga697cf22016-12-25 14:44:22 -080076FetchManager::Enqueue(const Name& deviceName, const Name& baseName, uint64_t minSeqNo,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080077 uint64_t maxSeqNo, int priority)
Zhenkai Zhua0147382013-01-29 15:57:27 -080078{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080079 Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
80 maxSeqNo, priority);
Zhenkai Zhua0147382013-01-29 15:57:27 -080081}
82
Alexander Afanasyev49a18522013-01-18 17:49:04 -080083void
Lijing Wanga697cf22016-12-25 14:44:22 -080084FetchManager::Enqueue(const Name& deviceName, const Name& baseName,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080085 const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
86 uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
Alexander Afanasyev49a18522013-01-18 17:49:04 -080087{
Zhenkai Zhu454bae22013-01-24 19:27:54 -080088 // Assumption for the following code is minSeqNo <= maxSeqNo
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080089 if (minSeqNo > maxSeqNo) {
Zhenkai Zhu454bae22013-01-24 19:27:54 -080090 return;
91 }
92
Alexander Afanasyev83531a42013-01-19 16:21:54 -080093 // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080094 Name forwardingHint;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080095 forwardingHint = m_mapping(deviceName);
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080096
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080097 if (m_taskDb) {
98 m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
99 }
Zhenkai Zhuda686882013-01-29 22:32:24 -0800100
Lijing Wanga697cf22016-12-25 14:44:22 -0800101 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyevbc8bf232013-01-29 11:19:17 -0800102
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800103 _LOG_TRACE("++++ Create fetcher: " << baseName);
104 Fetcher* fetcher =
Lijing Wanga697cf22016-12-25 14:44:22 -0800105 new Fetcher(m_face, segmentCallback, finishCallback,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800106 bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
107 bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
Lijing Wanga697cf22016-12-25 14:44:22 -0800108 maxSeqNo, time::seconds(30), forwardingHint);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800109
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800110 switch (priority) {
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800111 case PRIORITY_HIGH:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800112 _LOG_TRACE("++++ Push front fetcher: " << fetcher->GetName());
113 m_fetchList.push_front(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800114 break;
115
116 case PRIORITY_NORMAL:
117 default:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800118 _LOG_TRACE("++++ Push back fetcher: " << fetcher->GetName());
119 m_fetchList.push_back(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800120 break;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800121 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800122
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800123 _LOG_DEBUG("++++ Reschedule fetcher task");
Lijing Wanga697cf22016-12-25 14:44:22 -0800124 m_scheduledFetchesEvent =
125 m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev49a18522013-01-18 17:49:04 -0800126}
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800127
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800128void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800129FetchManager::ScheduleFetches()
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800130{
Lijing Wanga697cf22016-12-25 14:44:22 -0800131 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800132
Lijing Wanga697cf22016-12-25 14:44:22 -0800133 auto currentTime = time::steady_clock::now();
134 auto nextSheduleCheck = currentTime + time::seconds(300); // no reason to have anything, but just in case
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800135
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800136 for (FetchList::iterator item = m_fetchList.begin();
137 m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
138 item++) {
139 if (item->IsActive()) {
140 _LOG_DEBUG("Item is active");
141 continue;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800142 }
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800143
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800144 if (item->IsTimedWait()) {
145 _LOG_DEBUG("Item is in timed-wait");
146 continue;
147 }
148
149 if (currentTime < item->GetNextScheduledRetry()) {
Lijing Wanga697cf22016-12-25 14:44:22 -0800150 if (item->GetNextScheduledRetry() < nextSheduleCheck) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800151 nextSheduleCheck = item->GetNextScheduledRetry();
Lijing Wanga697cf22016-12-25 14:44:22 -0800152 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800153
154 _LOG_DEBUG("Item is delayed");
155 continue;
156 }
157
158 _LOG_DEBUG("Start fetching of " << item->GetName());
159
160 m_currentParallelFetches++;
161 _LOG_TRACE("++++ RESTART PIPELINE: " << item->GetName());
162 item->RestartPipeline();
163 }
164
Lijing Wanga697cf22016-12-25 14:44:22 -0800165 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(nextSheduleCheck - currentTime,
166 bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800167}
168
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800169void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800170FetchManager::DidNoDataTimeout(Fetcher& fetcher)
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800171{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800172 _LOG_DEBUG("No data timeout for " << fetcher.GetName() << " with forwarding hint: "
173 << fetcher.GetForwardingHint());
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800174
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800175 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800176 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800177 m_currentParallelFetches--;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800178 // no need to do anything with the m_fetchList
179 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800180
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800181 if (fetcher.GetForwardingHint().size() == 0) {
182 // will be tried initially and again after empty forwarding hint
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800183
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800184 /// @todo Handle potential exception
185 Name forwardingHint;
186 forwardingHint = m_mapping(fetcher.GetDeviceName());
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800187
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800188 if (forwardingHint.size() == 0) {
189 // make sure that we will try broadcast forwarding hint eventually
190 fetcher.SetForwardingHint(m_broadcastHint);
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800191 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800192 else {
193 fetcher.SetForwardingHint(forwardingHint);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800194 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800195 }
196 else if (fetcher.GetForwardingHint() == m_broadcastHint) {
197 // will be tried after broadcast forwarding hint
198 fetcher.SetForwardingHint(Name("/"));
199 }
200 else {
201 // will be tried after normal forwarding hint
202 fetcher.SetForwardingHint(m_broadcastHint);
203 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800204
Lijing Wanga697cf22016-12-25 14:44:22 -0800205 time::seconds delay = fetcher.GetRetryPause();
206 if (delay < time::seconds(1)) // first time
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800207 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800208 delay = time::seconds(1);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800209 }
210 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800211 delay = std::min(2 * delay, time::seconds(300)); // 5 minutes max
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800212 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800213
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800214 fetcher.SetRetryPause(delay);
Lijing Wanga697cf22016-12-25 14:44:22 -0800215 fetcher.SetNextScheduledRetry(time::steady_clock::now() + time::seconds(delay));
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800216
Lijing Wanga697cf22016-12-25 14:44:22 -0800217 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800218}
219
220void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800221FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800222{
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800223 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800224 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800225 m_currentParallelFetches--;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800226
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800227 if (m_taskDb) {
228 m_taskDb->deleteTask(deviceName, baseName);
229 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800230 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800231
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800232 // like TCP timed-wait
Lijing Wanga697cf22016-12-25 14:44:22 -0800233 m_scheduler.scheduleEvent(time::seconds(10), bind(&FetchManager::TimedWait, this, ref(fetcher)));
234 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800235}
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800236
237void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800238FetchManager::TimedWait(Fetcher& fetcher)
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800239{
Lijing Wanga697cf22016-12-25 14:44:22 -0800240 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800241 _LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
242 m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800243}
Lijing Wanga697cf22016-12-25 14:44:22 -0800244
245} // namespace chronoshare
246} // namespace ndn