blob: 27f8d6df37387eaf155845ce36926c6b502ba8ac [file] [log] [blame]
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2/**
3 * Copyright (c) 2013-2016, Regents of the University of California.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08004 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08005 * This file is part of ChronoShare, a decentralized file sharing application over NDN.
Alexander Afanasyev8811b352013-01-02 12:51:15 -08006 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -08007 * ChronoShare is free software: you can redistribute it and/or modify it under the terms
8 * of the GNU General Public License as published by the Free Software Foundation, either
9 * version 3 of the License, or (at your option) any later version.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080010 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080011 * ChronoShare is distributed in the hope that it will be useful, but WITHOUT ANY
12 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13 * PARTICULAR PURPOSE. See the GNU General Public License for more details.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080014 *
Alexander Afanasyevfa2f6622016-12-25 12:28:00 -080015 * You should have received copies of the GNU General Public License along with
16 * ChronoShare, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
17 *
18 * See AUTHORS.md for complete list of ChronoShare authors and contributors.
Alexander Afanasyev8811b352013-01-02 12:51:15 -080019 */
20
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080021#include "fetch-manager.hpp"
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080022#include <boost/lexical_cast.hpp>
Alexander Afanasyev49a18522013-01-18 17:49:04 -080023#include <boost/make_shared.hpp>
24#include <boost/ref.hpp>
25#include <boost/throw_exception.hpp>
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080026
Alexander Afanasyevf4cde4e2016-12-25 13:42:57 -080027#include "logging.hpp"
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080028#include "simple-interval-generator.hpp"
Alexander Afanasyevfc720362013-01-24 21:49:48 -080029
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080030INIT_LOGGER("FetchManager");
Alexander Afanasyev49a18522013-01-18 17:49:04 -080031
32using namespace boost;
33using namespace std;
Alexander Afanasyev1dd37ed2013-08-14 18:08:09 -070034using namespace Ndnx;
Alexander Afanasyev49a18522013-01-18 17:49:04 -080035
Alexander Afanasyev83531a42013-01-19 16:21:54 -080036//The disposer object function
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080037struct fetcher_disposer
38{
39 void
40 operator()(Fetcher* delete_this)
41 {
42 delete delete_this;
43 }
44};
Alexander Afanasyev83531a42013-01-19 16:21:54 -080045
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080046static const string SCHEDULE_FETCHES_TAG = "ScheduleFetches";
47
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080048FetchManager::FetchManager(Ccnx::CcnxWrapperPtr ccnx,
49 const Mapping& mapping,
50 const Name& broadcastForwardingHint,
51 uint32_t parallelFetches, // = 3
52 const SegmentCallback& defaultSegmentCallback,
53 const FinishCallback& defaultFinishCallback,
54 const FetchTaskDbPtr& taskDb)
55 : m_ccnx(ccnx)
56 , m_mapping(mapping)
57 , m_maxParallelFetches(parallelFetches)
58 , m_currentParallelFetches(0)
59 , m_scheduler(new Scheduler)
60 , m_executor(new Executor(1))
Zhenkai Zhua0147382013-01-29 15:57:27 -080061 , m_defaultSegmentCallback(defaultSegmentCallback)
62 , m_defaultFinishCallback(defaultFinishCallback)
Zhenkai Zhuda686882013-01-29 22:32:24 -080063 , m_taskDb(taskDb)
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080064 , m_broadcastHint(broadcastForwardingHint)
Alexander Afanasyev8811b352013-01-02 12:51:15 -080065{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080066 m_scheduler->start();
Zhenkai Zhuab9215c2013-01-28 23:42:28 -080067 m_executor->start();
Alexander Afanasyev548d38d2013-01-26 16:36:06 -080068
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080069 m_scheduleFetchesTask =
70 Scheduler::schedulePeriodicTask(m_scheduler,
71 make_shared<SimpleIntervalGenerator>(
72 300), // no need to check to often. if needed, will be rescheduled
73 bind(&FetchManager::ScheduleFetches, this),
74 SCHEDULE_FETCHES_TAG);
Zhenkai Zhuda686882013-01-29 22:32:24 -080075 // resume un-finished fetches if there is any
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080076 if (m_taskDb) {
Zhenkai Zhuda686882013-01-29 22:32:24 -080077 m_taskDb->foreachTask(bind(&FetchManager::Enqueue, this, _1, _2, _3, _4, _5));
78 }
Alexander Afanasyev49a18522013-01-18 17:49:04 -080079}
Alexander Afanasyeva199f972013-01-02 19:37:26 -080080
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080081FetchManager::~FetchManager()
Alexander Afanasyev49a18522013-01-18 17:49:04 -080082{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080083 m_scheduler->shutdown();
Zhenkai Zhuab9215c2013-01-28 23:42:28 -080084 m_executor->shutdown();
85
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080086 m_ccnx.reset();
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -080087
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080088 m_fetchList.clear_and_dispose(fetcher_disposer());
Alexander Afanasyev49a18522013-01-18 17:49:04 -080089}
Alexander Afanasyev8811b352013-01-02 12:51:15 -080090
Zhenkai Zhua0147382013-01-29 15:57:27 -080091// Enqueue using default callbacks
92void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080093FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName, uint64_t minSeqNo,
94 uint64_t maxSeqNo, int priority)
Zhenkai Zhua0147382013-01-29 15:57:27 -080095{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -080096 Enqueue(deviceName, baseName, m_defaultSegmentCallback, m_defaultFinishCallback, minSeqNo,
97 maxSeqNo, priority);
Zhenkai Zhua0147382013-01-29 15:57:27 -080098}
99
Alexander Afanasyev49a18522013-01-18 17:49:04 -0800100void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800101FetchManager::Enqueue(const Ccnx::Name& deviceName, const Ccnx::Name& baseName,
102 const SegmentCallback& segmentCallback, const FinishCallback& finishCallback,
103 uint64_t minSeqNo, uint64_t maxSeqNo, int priority /*PRIORITY_NORMAL*/)
Alexander Afanasyev49a18522013-01-18 17:49:04 -0800104{
Zhenkai Zhu454bae22013-01-24 19:27:54 -0800105 // Assumption for the following code is minSeqNo <= maxSeqNo
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800106 if (minSeqNo > maxSeqNo) {
Zhenkai Zhu454bae22013-01-24 19:27:54 -0800107 return;
108 }
109
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800110 // we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800111 Name forwardingHint;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800112 forwardingHint = m_mapping(deviceName);
Alexander Afanasyev21a166e2013-01-20 16:04:41 -0800113
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800114 if (m_taskDb) {
115 m_taskDb->addTask(deviceName, baseName, minSeqNo, maxSeqNo, priority);
116 }
Zhenkai Zhuda686882013-01-29 22:32:24 -0800117
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800118 unique_lock<mutex> lock(m_parellelFetchMutex);
Alexander Afanasyevbc8bf232013-01-29 11:19:17 -0800119
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800120 _LOG_TRACE("++++ Create fetcher: " << baseName);
121 Fetcher* fetcher =
122 new Fetcher(m_ccnx, m_executor, segmentCallback, finishCallback,
123 bind(&FetchManager::DidFetchComplete, this, _1, _2, _3),
124 bind(&FetchManager::DidNoDataTimeout, this, _1), deviceName, baseName, minSeqNo,
125 maxSeqNo, boost::posix_time::seconds(30), forwardingHint);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800126
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800127 switch (priority) {
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800128 case PRIORITY_HIGH:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800129 _LOG_TRACE("++++ Push front fetcher: " << fetcher->GetName());
130 m_fetchList.push_front(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800131 break;
132
133 case PRIORITY_NORMAL:
134 default:
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800135 _LOG_TRACE("++++ Push back fetcher: " << fetcher->GetName());
136 m_fetchList.push_back(*fetcher);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800137 break;
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800138 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800139
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800140 _LOG_DEBUG("++++ Reschedule fetcher task");
141 m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800142 // ScheduleFetches (); // will start a fetch if m_currentParallelFetches is less than max, otherwise does nothing
Alexander Afanasyev49a18522013-01-18 17:49:04 -0800143}
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800144
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800145void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800146FetchManager::ScheduleFetches()
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800147{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800148 unique_lock<mutex> lock(m_parellelFetchMutex);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800149
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800150 boost::posix_time::ptime currentTime =
151 date_time::second_clock<boost::posix_time::ptime>::universal_time();
152 boost::posix_time::ptime nextSheduleCheck =
153 currentTime + posix_time::seconds(300); // no reason to have anything, but just in case
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800154
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800155 for (FetchList::iterator item = m_fetchList.begin();
156 m_currentParallelFetches < m_maxParallelFetches && item != m_fetchList.end();
157 item++) {
158 if (item->IsActive()) {
159 _LOG_DEBUG("Item is active");
160 continue;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800161 }
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800162
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800163 if (item->IsTimedWait()) {
164 _LOG_DEBUG("Item is in timed-wait");
165 continue;
166 }
167
168 if (currentTime < item->GetNextScheduledRetry()) {
169 if (item->GetNextScheduledRetry() < nextSheduleCheck)
170 nextSheduleCheck = item->GetNextScheduledRetry();
171
172 _LOG_DEBUG("Item is delayed");
173 continue;
174 }
175
176 _LOG_DEBUG("Start fetching of " << item->GetName());
177
178 m_currentParallelFetches++;
179 _LOG_TRACE("++++ RESTART PIPELINE: " << item->GetName());
180 item->RestartPipeline();
181 }
182
183 m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask,
184 (nextSheduleCheck - currentTime).total_seconds());
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800185}
186
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800187void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800188FetchManager::DidNoDataTimeout(Fetcher& fetcher)
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800189{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800190 _LOG_DEBUG("No data timeout for " << fetcher.GetName() << " with forwarding hint: "
191 << fetcher.GetForwardingHint());
Alexander Afanasyevff8d9dc2013-01-26 00:45:08 -0800192
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800193 {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800194 unique_lock<mutex> lock(m_parellelFetchMutex);
195 m_currentParallelFetches--;
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800196 // no need to do anything with the m_fetchList
197 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800198
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800199 if (fetcher.GetForwardingHint().size() == 0) {
200 // will be tried initially and again after empty forwarding hint
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800201
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800202 /// @todo Handle potential exception
203 Name forwardingHint;
204 forwardingHint = m_mapping(fetcher.GetDeviceName());
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800205
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800206 if (forwardingHint.size() == 0) {
207 // make sure that we will try broadcast forwarding hint eventually
208 fetcher.SetForwardingHint(m_broadcastHint);
Alexander Afanasyev84894d32013-02-08 22:27:12 -0800209 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800210 else {
211 fetcher.SetForwardingHint(forwardingHint);
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800212 }
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800213 }
214 else if (fetcher.GetForwardingHint() == m_broadcastHint) {
215 // will be tried after broadcast forwarding hint
216 fetcher.SetForwardingHint(Name("/"));
217 }
218 else {
219 // will be tried after normal forwarding hint
220 fetcher.SetForwardingHint(m_broadcastHint);
221 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800222
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800223 double delay = fetcher.GetRetryPause();
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800224 if (delay < 1) // first time
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800225 {
226 delay = 1;
227 }
228 else {
229 delay = std::min(2 * delay, 300.0); // 5 minutes max
230 }
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800231
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800232 fetcher.SetRetryPause(delay);
233 fetcher.SetNextScheduledRetry(date_time::second_clock<boost::posix_time::ptime>::universal_time() +
234 posix_time::seconds(delay));
Alexander Afanasyevf9756232013-01-28 16:42:20 -0800235
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800236 m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800237}
238
239void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800240FetchManager::DidFetchComplete(Fetcher& fetcher, const Name& deviceName, const Name& baseName)
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800241{
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800242 {
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800243 unique_lock<mutex> lock(m_parellelFetchMutex);
244 m_currentParallelFetches--;
Alexander Afanasyev1d1cc832013-02-05 20:03:36 -0800245
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800246 if (m_taskDb) {
247 m_taskDb->deleteTask(deviceName, baseName);
248 }
Alexander Afanasyev83531a42013-01-19 16:21:54 -0800249 }
Alexander Afanasyevd6c2a902013-01-19 21:24:30 -0800250
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800251 // like TCP timed-wait
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800252 m_scheduler->scheduleOneTimeTask(m_scheduler, 10,
253 boost::bind(&FetchManager::TimedWait, this, ref(fetcher)),
254 boost::lexical_cast<string>(baseName));
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800255
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800256 m_scheduler->rescheduleTaskAt(m_scheduleFetchesTask, 0);
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800257}
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800258
259void
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800260FetchManager::TimedWait(Fetcher& fetcher)
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800261{
Alexander Afanasyeveda3b7a2016-12-25 11:26:40 -0800262 unique_lock<mutex> lock(m_parellelFetchMutex);
263 _LOG_TRACE("+++++ removing fetcher: " << fetcher.GetName());
264 m_fetchList.erase_and_dispose(FetchList::s_iterator_to(fetcher), fetcher_disposer());
Zhenkai Zhu354d46d2013-02-06 13:49:48 -0800265}