blob: 209e3d11b3152c80882b89d231fd001f24452536 [file] [log] [blame]
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
20 */
21
22#include "scheduler.h"
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -080023#include "one-time-task.h"
24#include "periodic-task.h"
Alexander Afanasyevfc720362013-01-24 21:49:48 -080025#include "logging.h"
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -080026
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080027#include <utility>
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -080028#include <boost/make_shared.hpp>
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080029
Alexander Afanasyevfc720362013-01-24 21:49:48 -080030INIT_LOGGER ("Scheduler");
31
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080032using namespace std;
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -080033using namespace boost;
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080034
35#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
36
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080037static void
38dummyCallback(evutil_socket_t fd, short what, void *arg)
39{
40 // 1 year later, that was a long run for the app
Zhenkai Zhu3fe11722013-01-25 16:22:46 -080041 // let's wait for another year
42 timeval tv;
43 tv.tv_sec = 365 * 24 * 3600;
44 tv.tv_usec = 0;
45 event *ev = *(event **)arg;
46 int res = evtimer_add(ev, &tv);
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080047}
48
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080049// IntervalGeneratorPtr
50// IntervalGenerator:: Null;
51
52void errorCallback(int err)
53{
Alexander Afanasyevfc720362013-01-24 21:49:48 -080054 _LOG_ERROR ("Fatal error: " << err);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080055}
56
57Scheduler::Scheduler()
58 : m_running(false)
Zhenkai Zhu1888f742013-01-28 12:47:33 -080059 , m_executor(1)
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080060{
61 event_set_fatal_callback(errorCallback);
62 evthread_use_pthreads();
63 m_base = event_base_new();
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080064
65 // This is a hack to prevent event_base_loop from exiting;
66 // the flag EVLOOP_NO_EXIT_ON_EMPTY is somehow ignored, at least on Mac OS X
67 // it's going to be scheduled to 10 years later
68 timeval tv;
69 tv.tv_sec = 365 * 24 * 3600;
70 tv.tv_usec = 0;
Zhenkai Zhu3fe11722013-01-25 16:22:46 -080071 m_ev = evtimer_new(m_base, dummyCallback, &m_ev);
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080072 int res = evtimer_add(m_ev, &tv);
73 if (res < 0)
74 {
75 _LOG_ERROR("heck");
76 }
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080077}
78
79Scheduler::~Scheduler()
80{
Alexander Afanasyevfc720362013-01-24 21:49:48 -080081 shutdown ();
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080082 evtimer_del(m_ev);
83 event_free(m_ev);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080084 event_base_free(m_base);
85}
86
87void
88Scheduler::eventLoop()
89{
90 while(true)
91 {
92 if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
93 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -080094 _LOG_DEBUG ("scheduler loop break error");
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -080095 }
Zhenkai Zhub16be8f2013-01-25 16:12:30 -080096 else
97 {
98 _LOG_DEBUG ("scheduler loop break normal");
99 }
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800100
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800101 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800102 ScopedLock lock(m_mutex);
Alexander Afanasyev34edd4d2013-01-17 17:55:45 -0800103 if (!m_running)
104 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800105 _LOG_DEBUG ("scheduler loop break normal");
Alexander Afanasyev34edd4d2013-01-17 17:55:45 -0800106 break;
107 }
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800108 }
Zhenkai Zhu238db852013-01-25 16:27:28 -0800109
110 // just to prevent craziness in CPU usage which supposedly should not happen
111 // after adding the dummy event
112 usleep(1000);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800113 }
114}
115
116void
Zhenkai Zhu1888f742013-01-28 12:47:33 -0800117Scheduler::execute(Executor::Job job)
118{
119 m_executor.execute(job);
120}
121
122void
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800123Scheduler::start()
124{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800125 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800126 if (!m_running)
127 {
128 m_thread = boost::thread(&Scheduler::eventLoop, this);
Zhenkai Zhu1888f742013-01-28 12:47:33 -0800129 m_executor.start();
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800130 m_running = true;
131 }
132}
133
134void
135Scheduler::shutdown()
136{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800137 bool breakAndWait = false;
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800138 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800139 ScopedLock lock (m_mutex);
140 if (m_running)
141 {
142 m_running = false;
143 breakAndWait = true;
144 }
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800145 }
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800146
147 if (breakAndWait)
148 {
149 event_base_loopbreak(m_base);
Zhenkai Zhu1888f742013-01-28 12:47:33 -0800150 m_executor.shutdown();
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800151 m_thread.join();
152 }
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800153}
154
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800155TaskPtr
156Scheduler::scheduleOneTimeTask (SchedulerPtr scheduler, double delay,
157 const Task::Callback &callback, const Task::Tag &tag)
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800158{
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800159 TaskPtr task = make_shared<OneTimeTask> (callback, tag, scheduler, delay);
160 if (scheduler->addTask (task))
161 return task;
162 else
163 return TaskPtr ();
164}
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800165
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800166TaskPtr
167Scheduler::schedulePeriodicTask (SchedulerPtr scheduler, IntervalGeneratorPtr delayGenerator,
168 const Task::Callback &callback, const Task::Tag &tag)
169{
170 TaskPtr task = make_shared<PeriodicTask> (callback, tag, scheduler, delayGenerator);
171
172 if (scheduler->addTask (task))
173 return task;
174 else
175 return TaskPtr ();
176}
177
Yingdi Yu57f667b2013-07-11 10:37:59 -0700178TaskPtr
179Scheduler::scheduleDelayOneTimeTask (SchedulerPtr scheduler, double delay,
180 const Task::Callback &callback, const Task::Tag &tag)
181{
182 TaskPtr task = make_shared<OneTimeTask> (callback, tag, scheduler, delay);
183 if (scheduler->addTask (task))
184 return task;
185 else{
186 _LOG_ERROR ("reschedule task for " << tag);
187 scheduler->rescheduleTask(tag);
188 return TaskPtr ();
189 }
190}
191
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800192bool
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800193Scheduler::addTask(TaskPtr newTask, bool reset/* = true*/)
Alexander Afanasyeve41e7d22013-01-19 15:13:47 -0800194{
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800195 if (addToMap(newTask))
196 {
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800197 if (reset)
198 {
199 newTask->reset();
200 }
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800201 int res = evtimer_add(newTask->ev(), newTask->tv());
202 if (res < 0)
203 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800204 _LOG_ERROR ("evtimer_add failed for " << newTask->tag());
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800205 }
206 return true;
207 }
208 else
209 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800210 _LOG_ERROR ("fail to add task: " << newTask->tag());
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800211 }
212
213 return false;
214}
215
Alexander Afanasyevfb4c43f2013-01-18 17:43:25 -0800216void
Alexander Afanasyev997ba632013-01-18 17:40:23 -0800217Scheduler::deleteTask(TaskPtr task)
218{
219 deleteTask (task->tag ());
220}
221
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800222void
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800223Scheduler::rescheduleTask(TaskPtr task)
Zhenkai Zhu66ddb232013-01-18 17:53:52 -0800224{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800225 ScopedLock lock(m_mutex);
Zhenkai Zhud2ca3922013-01-18 17:58:48 -0800226 TaskMapIt it = m_taskMap.find(task->tag());
Zhenkai Zhu66ddb232013-01-18 17:53:52 -0800227 if (it != m_taskMap.end())
228 {
229 TaskPtr task = it->second;
230 task->reset();
231 int res = evtimer_add(task->ev(), task->tv());
232 if (res < 0)
233 {
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800234 _LOG_ERROR ("evtimer_add failed for " << task->tag());
Zhenkai Zhu66ddb232013-01-18 17:53:52 -0800235 }
236 }
237 else
238 {
239 addTask(task);
240 }
241}
242
243void
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800244Scheduler::rescheduleTask(const Task::Tag &tag)
245{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800246 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800247 TaskMapIt it = m_taskMap.find(tag);
248 if (it != m_taskMap.end())
249 {
250 TaskPtr task = it->second;
251 task->reset();
252 int res = evtimer_add(task->ev(), task->tv());
253 if (res < 0)
254 {
255 cout << "evtimer_add failed for " << task->tag() << endl;
256 }
257 }
258}
259
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800260void
261Scheduler::rescheduleTaskAt (const Task::Tag &tag, double time)
262{
263 ScopedLock lock(m_mutex);
264 TaskMapIt it = m_taskMap.find (tag);
265 if (it != m_taskMap.end())
266 {
267 TaskPtr task = it->second;
268 task->reset();
269 task->setTv (time);
270
271 int res = evtimer_add(task->ev(), task->tv());
272 if (res < 0)
273 {
274 _LOG_ERROR ("evtimer_add failed for " << task->tag());
275 }
276 }
277 else
278 {
279 _LOG_ERROR ("Task for tag " << tag << " not found");
280 }
281}
282
283void
284Scheduler::rescheduleTaskAt (TaskPtr task, double time)
285{
286 ScopedLock lock(m_mutex);
287 TaskMapIt it = m_taskMap.find(task->tag());
288 if (it != m_taskMap.end())
289 {
290 TaskPtr task = it->second;
291 task->reset();
292 task->setTv (time);
293
294 int res = evtimer_add(task->ev(), task->tv());
295 if (res < 0)
296 {
297 _LOG_ERROR ("evtimer_add failed for " << task->tag());
298 }
299 }
300 else
301 {
302 task->setTv (time); // force different time
303 addTask (task, false);
304 }
305}
306
307
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800308bool
Alexander Afanasyev548d38d2013-01-26 16:36:06 -0800309Scheduler::addToMap(TaskPtr task)
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800310{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800311 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800312 if (m_taskMap.find(task->tag()) == m_taskMap.end())
313 {
314 m_taskMap.insert(make_pair(task->tag(), task));
315 return true;
316 }
317 return false;
318}
319
320void
321Scheduler::deleteTask(const Task::Tag &tag)
322{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800323 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800324 TaskMapIt it = m_taskMap.find(tag);
325 if (it != m_taskMap.end())
326 {
327 TaskPtr task = it->second;
328 evtimer_del(task->ev());
329 m_taskMap.erase(it);
330 }
331}
332
333void
334Scheduler::deleteTask(const Task::TaskMatcher &matcher)
335{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800336 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800337 TaskMapIt it = m_taskMap.begin();
338 while(it != m_taskMap.end())
339 {
340 TaskPtr task = it->second;
341 if (matcher(task))
342 {
343 evtimer_del(task->ev());
344 // Use post increment; map.erase invalidate the iterator that is beening erased,
345 // but does not invalidate other iterators. This seems to be the convention to
346 // erase something from C++ STL map while traversing.
347 m_taskMap.erase(it++);
348 }
349 else
350 {
351 ++it;
352 }
353 }
354}
355
356int
357Scheduler::size()
358{
Alexander Afanasyevfc720362013-01-24 21:49:48 -0800359 ScopedLock lock(m_mutex);
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -0800360 return m_taskMap.size();
361}