blob: f7e428f4145abff1a5b5280d5946921a66532774 [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -08003 * Copyright (c) 2013-2017, Regents of the University of California.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080019 */
20
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080021#include "fetch-manager.hpp"
Lijing Wanga697cf22016-12-25 14:44:22 -080022#include "core/logging.hpp"
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080023
Lijing Wanga697cf22016-12-25 14:44:22 -080024#include <ndn-cxx/face.hpp>
25
26namespace ndn {
27namespace chronoshare {
Alexander Afanasyevfc720362013-01-24 21:49:48 -080028
Alexander Afanasyev1cf5c432017-01-13 23:22:15 -080029_LOG_INIT(FetchManager);
Alexander Afanasyev49a18522013-01-18 17:49:04 -080030
Lijing Wanga697cf22016-12-25 14:44:22 -080031// The disposer object function
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080032struct fetcher_disposer
33{
34 void
35 operator()(Fetcher* delete_this)
36 {
37 delete delete_this;
38 }
39};
Alexander Afanasyev83531a42013-01-19 16:21:54 -080040
Lijing Wanga697cf22016-12-25 14:44:22 -080041FetchManager::FetchManager(Face& face, const Mapping& mapping, const Name& broadcastForwardingHint,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080042 uint32_t parallelFetches, // = 3
Alexander Afanasyevc91fc332017-02-10 17:44:41 -080043 bool isSegment,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080044 const SegmentCallback& defaultSegmentCallback,
Lijing Wanga697cf22016-12-25 14:44:22 -080045 const FinishCallback& defaultFinishCallback, const FetchTaskDbPtr& taskDb)
46 : m_face(face)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080047 , m_mapping(mapping)
48 , m_maxParallelFetches(parallelFetches)
49 , m_currentParallelFetches(0)
Lijing Wanga697cf22016-12-25 14:44:22 -080050 , m_scheduler(m_face.getIoService())
51 , m_scheduledFetchesEvent(m_scheduler)
Zhenkai Zhua0147382013-01-29 15:57:27 -080052 , m_defaultSegmentCallback(defaultSegmentCallback)
53 , m_defaultFinishCallback(defaultFinishCallback)
Zhenkai Zhuda686882013-01-29 22:32:24 -080054 , m_taskDb(taskDb)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080055 , m_broadcastHint(broadcastForwardingHint)
Lijing Wanga697cf22016-12-25 14:44:22 -080056 , m_ioService(m_face.getIoService())
Alexander Afanasyevc91fc332017-02-10 17:44:41 -080057 , m_isSegment(isSegment)
Alexander Afanasyev8811b352013-01-02 12:51:15 -080058{
Lijing Wanga697cf22016-12-25 14:44:22 -080059 // no need to check to often. if needed, will be rescheduled
60 m_scheduledFetchesEvent =
61 m_scheduler.scheduleEvent(time::seconds(300), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080062
Zhenkai Zhuda686882013-01-29 22:32:24 -080063 // resume un-finished fetches if there is any
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080064 if (m_taskDb) {
Lijing Wanga697cf22016-12-25 14:44:22 -080065 m_taskDb->foreachTask(
66 [this](const Name& deviceName, const Name& baseName, uint64_t minSeqNo, uint64_t maxSeqNo,
67 int priority) { this->Enqueue(deviceName, baseName, minSeqNo, maxSeqNo, priority); });
Zhenkai Zhuda686882013-01-29 22:32:24 -080068 }
Alexander Afanasyev49a18522013-01-18 17:49:04 -080069}
Alexander Afanasyeva199f972013-01-02 19:37:26 -080070
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080071FetchManager::~FetchManager()
Alexander Afanasyev49a18522013-01-18 17:49:04 -080072{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080073 m_fetchList.clear_and_dispose(fetcher_disposer());
Alexander Afanasyev49a18522013-01-18 17:49:04 -080074}
Alexander Afanasyev8811b352013-01-02 12:51:15 -080075
Zhenkai Zhua0147382013-01-29 15:57:27 -080076// Enqueue using default callbacks
77void
Lijing Wanga697cf22016-12-25 14:44:22 -080078FetchManager::Enqueue(const Name& deviceName, const Name& baseName, uint64_t minSeqNo,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080079 uint64_t maxSeqNo, int priority)
Zhenkai Zhua0147382013-01-29 15:57:27 -080080{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080081 Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
82 maxSeqNo, priority);
Zhenkai Zhua0147382013-01-29 15:57:27 -080083}
84
Alexander Afanasyev49a18522013-01-18 17:49:04 -080085void
Lijing Wanga697cf22016-12-25 14:44:22 -080086FetchManager::Enqueue(const Name& deviceName, const Name& baseName,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080087 const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
88 uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
Alexander Afanasyev49a18522013-01-18 17:49:04 -080089{
Zhenkai Zhu454bae22013-01-24 19:27:54 -080090 // Assumption for the following code is minSeqNo <= maxSeqNo
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080091 if (minSeqNo > maxSeqNo) {
Zhenkai Zhu454bae22013-01-24 19:27:54 -080092 return;
93 }
94
Alexander Afanasyev83531a42013-01-19 16:21:54 -080095 // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080096 Name forwardingHint;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080097 forwardingHint = m_mapping(deviceName);
Alexander Afanasyev21a166e2013-01-20 16:04:41 -080098
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080099 if (m_taskDb) {
100 m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
101 }
Zhenkai Zhuda686882013-01-29 22:32:24 -0800102
Lijing Wanga697cf22016-12-25 14:44:22 -0800103 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyevbc8bf232013-01-29 11:19:17 -0800104
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800105 _LOG_TRACE("++++ Create fetcher: " << baseName);
106 Fetcher* fetcher =
Alexander Afanasyevc91fc332017-02-10 17:44:41 -0800107 new Fetcher(m_face, m_isSegment, segmentCallback, finishCallback,
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800108 bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
109 bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
Lijing Wanga697cf22016-12-25 14:44:22 -0800110 maxSeqNo, time::seconds(30), forwardingHint);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800111
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800112 switch (priority) {
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800113 case PRIORITY_HIGH:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800114 _LOG_TRACE("++++ Push front fetcher: " << fetcher->GetName());
115 m_fetchList.push_front(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800116 break;
117
118 case PRIORITY_NORMAL:
119 default:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800120 _LOG_TRACE("++++ Push back fetcher: " << fetcher->GetName());
121 m_fetchList.push_back(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800122 break;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800123 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800124
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800125 _LOG_DEBUG("++++ Reschedule fetcher task");
Lijing Wanga697cf22016-12-25 14:44:22 -0800126 m_scheduledFetchesEvent =
127 m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev49a18522013-01-18 17:49:04 -0800128}
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800129
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800130void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800131FetchManager::ScheduleFetches()
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800132{
Lijing Wanga697cf22016-12-25 14:44:22 -0800133 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800134
Lijing Wanga697cf22016-12-25 14:44:22 -0800135 auto currentTime = time::steady_clock::now();
136 auto nextSheduleCheck = currentTime + time::seconds(300); // no reason to have anything, but just in case
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800137
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800138 for (FetchList::iterator item = m_fetchList.begin();
139 m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
140 item++) {
141 if (item->IsActive()) {
142 _LOG_DEBUG("Item is active");
143 continue;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800144 }
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800145
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800146 if (item->IsTimedWait()) {
147 _LOG_DEBUG("Item is in timed-wait");
148 continue;
149 }
150
151 if (currentTime < item->GetNextScheduledRetry()) {
Lijing Wanga697cf22016-12-25 14:44:22 -0800152 if (item->GetNextScheduledRetry() < nextSheduleCheck) {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800153 nextSheduleCheck = item->GetNextScheduledRetry();
Lijing Wanga697cf22016-12-25 14:44:22 -0800154 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800155
156 _LOG_DEBUG("Item is delayed");
157 continue;
158 }
159
160 _LOG_DEBUG("Start fetching of " << item->GetName());
161
162 m_currentParallelFetches++;
163 _LOG_TRACE("++++ RESTART PIPELINE: " << item->GetName());
164 item->RestartPipeline();
165 }
166
Lijing Wanga697cf22016-12-25 14:44:22 -0800167 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(nextSheduleCheck - currentTime,
168 bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800169}
170
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800171void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800172FetchManager::DidNoDataTimeout(Fetcher& fetcher)
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800173{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800174 _LOG_DEBUG("No data timeout for " << fetcher.GetName() << " with forwarding hint: "
175 << fetcher.GetForwardingHint());
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800176
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800177 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800178 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800179 m_currentParallelFetches--;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800180 // no need to do anything with the m_fetchList
181 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800182
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800183 if (fetcher.GetForwardingHint().size() == 0) {
184 // will be tried initially and again after empty forwarding hint
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800185
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800186 /// @todo Handle potential exception
187 Name forwardingHint;
188 forwardingHint = m_mapping(fetcher.GetDeviceName());
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800189
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800190 if (forwardingHint.size() == 0) {
191 // make sure that we will try broadcast forwarding hint eventually
192 fetcher.SetForwardingHint(m_broadcastHint);
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800193 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800194 else {
195 fetcher.SetForwardingHint(forwardingHint);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800196 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800197 }
198 else if (fetcher.GetForwardingHint() == m_broadcastHint) {
199 // will be tried after broadcast forwarding hint
200 fetcher.SetForwardingHint(Name("/"));
201 }
202 else {
203 // will be tried after normal forwarding hint
204 fetcher.SetForwardingHint(m_broadcastHint);
205 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800206
Lijing Wanga697cf22016-12-25 14:44:22 -0800207 time::seconds delay = fetcher.GetRetryPause();
208 if (delay < time::seconds(1)) // first time
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800209 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800210 delay = time::seconds(1);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800211 }
212 else {
Lijing Wanga697cf22016-12-25 14:44:22 -0800213 delay = std::min(2 * delay, time::seconds(300)); // 5 minutes max
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800214 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800215
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800216 fetcher.SetRetryPause(delay);
Lijing Wanga697cf22016-12-25 14:44:22 -0800217 fetcher.SetNextScheduledRetry(time::steady_clock::now() + time::seconds(delay));
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800218
Lijing Wanga697cf22016-12-25 14:44:22 -0800219 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800220}
221
222void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800223FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800224{
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800225 {
Lijing Wanga697cf22016-12-25 14:44:22 -0800226 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800227 m_currentParallelFetches--;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800228
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800229 if (m_taskDb) {
230 m_taskDb->deleteTask(deviceName, baseName);
231 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800232 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800233
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800234 // like TCP timed-wait
Lijing Wanga697cf22016-12-25 14:44:22 -0800235 m_scheduler.scheduleEvent(time::seconds(10), bind(&FetchManager::TimedWait, this, ref(fetcher)));
236 m_scheduledFetchesEvent = m_scheduler.scheduleEvent(time::seconds(0), bind(&FetchManager::ScheduleFetches, this));
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800237}
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800238
239void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800240FetchManager::TimedWait(Fetcher& fetcher)
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800241{
Lijing Wanga697cf22016-12-25 14:44:22 -0800242 std::unique_lock<std::mutex> lock(m_parellelFetchMutex);
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800243 _LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
244 m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800245}
Lijing Wanga697cf22016-12-25 14:44:22 -0800246
247} // namespace chronoshare
248} // namespace ndn