blob: 02ac0c08e8915e6038dabfc313f34f38290b059d [file] [log] [blame]
Jeff Thompsonfa306642013-06-17 15:06:57 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Alexander Afanasyev
5 * Zhenkai Zhu
6 *
7 * BSD license, See the LICENSE file for more information
8 *
9 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
10 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
11 */
12
13#include "scheduler.h"
14#include "one-time-task.h"
15#include "periodic-task.h"
16#include "logging.h"
17
18#include <utility>
19#include <boost/make_shared.hpp>
20
21INIT_LOGGER ("Scheduler");
22
23using namespace std;
24using namespace boost;
25
26#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
27
28static void
29dummyCallback(evutil_socket_t fd, short what, void *arg)
30{
31 // 1 year later, that was a long run for the app
32 // let's wait for another year
33 timeval tv;
34 tv.tv_sec = 365 * 24 * 3600;
35 tv.tv_usec = 0;
36 event *ev = *(event **)arg;
37 int res = evtimer_add(ev, &tv);
38}
39
40// IntervalGeneratorPtr
41// IntervalGenerator:: Null;
42
43void errorCallback(int err)
44{
45 _LOG_ERROR ("Fatal error: " << err);
46}
47
48Scheduler::Scheduler()
49 : m_running(false)
50 , m_executor(1)
51{
52 event_set_fatal_callback(errorCallback);
53 evthread_use_pthreads();
54 m_base = event_base_new();
55
56 // This is a hack to prevent event_base_loop from exiting;
57 // the flag EVLOOP_NO_EXIT_ON_EMPTY is somehow ignored, at least on Mac OS X
58 // it's going to be scheduled to 10 years later
59 timeval tv;
60 tv.tv_sec = 365 * 24 * 3600;
61 tv.tv_usec = 0;
62 m_ev = evtimer_new(m_base, dummyCallback, &m_ev);
63 int res = evtimer_add(m_ev, &tv);
64 if (res < 0)
65 {
66 _LOG_ERROR("heck");
67 }
68}
69
70Scheduler::~Scheduler()
71{
72 shutdown ();
73 evtimer_del(m_ev);
74 event_free(m_ev);
75 event_base_free(m_base);
76}
77
78void
79Scheduler::eventLoop()
80{
81 while(true)
82 {
83 if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
84 {
85 _LOG_DEBUG ("scheduler loop break error");
86 }
87 else
88 {
89 _LOG_DEBUG ("scheduler loop break normal");
90 }
91
92 {
93 ScopedLock lock(m_mutex);
94 if (!m_running)
95 {
96 _LOG_DEBUG ("scheduler loop break normal");
97 break;
98 }
99 }
100
101 // just to prevent craziness in CPU usage which supposedly should not happen
102 // after adding the dummy event
103 usleep(1000);
104 }
105}
106
107void
108Scheduler::execute(Executor::Job job)
109{
110 m_executor.execute(job);
111}
112
113void
114Scheduler::start()
115{
116 ScopedLock lock(m_mutex);
117 if (!m_running)
118 {
119 m_thread = boost::thread(&Scheduler::eventLoop, this);
120 m_executor.start();
121 m_running = true;
122 }
123}
124
125void
126Scheduler::shutdown()
127{
128 bool breakAndWait = false;
129 {
130 ScopedLock lock (m_mutex);
131 if (m_running)
132 {
133 m_running = false;
134 breakAndWait = true;
135 }
136 }
137
138 if (breakAndWait)
139 {
140 event_base_loopbreak(m_base);
141 m_executor.shutdown();
142 m_thread.join();
143 }
144}
145
146TaskPtr
147Scheduler::scheduleOneTimeTask (SchedulerPtr scheduler, double delay,
148 const Task::Callback &callback, const Task::Tag &tag)
149{
150 TaskPtr task = make_shared<OneTimeTask> (callback, tag, scheduler, delay);
151 if (scheduler->addTask (task))
152 return task;
153 else
154 return TaskPtr ();
155}
156
157TaskPtr
158Scheduler::schedulePeriodicTask (SchedulerPtr scheduler, IntervalGeneratorPtr delayGenerator,
159 const Task::Callback &callback, const Task::Tag &tag)
160{
161 TaskPtr task = make_shared<PeriodicTask> (callback, tag, scheduler, delayGenerator);
162
163 if (scheduler->addTask (task))
164 return task;
165 else
166 return TaskPtr ();
167}
168
169bool
170Scheduler::addTask(TaskPtr newTask, bool reset/* = true*/)
171{
172 if (addToMap(newTask))
173 {
174 if (reset)
175 {
176 newTask->reset();
177 }
178 int res = evtimer_add(newTask->ev(), newTask->tv());
179 if (res < 0)
180 {
181 _LOG_ERROR ("evtimer_add failed for " << newTask->tag());
182 }
183 return true;
184 }
185 else
186 {
187 _LOG_ERROR ("fail to add task: " << newTask->tag());
188 }
189
190 return false;
191}
192
193void
194Scheduler::deleteTask(TaskPtr task)
195{
196 deleteTask (task->tag ());
197}
198
199void
200Scheduler::rescheduleTask(TaskPtr task)
201{
202 ScopedLock lock(m_mutex);
203 TaskMapIt it = m_taskMap.find(task->tag());
204 if (it != m_taskMap.end())
205 {
206 TaskPtr task = it->second;
207 task->reset();
208 int res = evtimer_add(task->ev(), task->tv());
209 if (res < 0)
210 {
211 _LOG_ERROR ("evtimer_add failed for " << task->tag());
212 }
213 }
214 else
215 {
216 addTask(task);
217 }
218}
219
220void
221Scheduler::rescheduleTask(const Task::Tag &tag)
222{
223 ScopedLock lock(m_mutex);
224 TaskMapIt it = m_taskMap.find(tag);
225 if (it != m_taskMap.end())
226 {
227 TaskPtr task = it->second;
228 task->reset();
229 int res = evtimer_add(task->ev(), task->tv());
230 if (res < 0)
231 {
232 cout << "evtimer_add failed for " << task->tag() << endl;
233 }
234 }
235}
236
237void
238Scheduler::rescheduleTaskAt (const Task::Tag &tag, double time)
239{
240 ScopedLock lock(m_mutex);
241 TaskMapIt it = m_taskMap.find (tag);
242 if (it != m_taskMap.end())
243 {
244 TaskPtr task = it->second;
245 task->reset();
246 task->setTv (time);
247
248 int res = evtimer_add(task->ev(), task->tv());
249 if (res < 0)
250 {
251 _LOG_ERROR ("evtimer_add failed for " << task->tag());
252 }
253 }
254 else
255 {
256 _LOG_ERROR ("Task for tag " << tag << " not found");
257 }
258}
259
260void
261Scheduler::rescheduleTaskAt (TaskPtr task, double time)
262{
263 ScopedLock lock(m_mutex);
264 TaskMapIt it = m_taskMap.find(task->tag());
265 if (it != m_taskMap.end())
266 {
267 TaskPtr task = it->second;
268 task->reset();
269 task->setTv (time);
270
271 int res = evtimer_add(task->ev(), task->tv());
272 if (res < 0)
273 {
274 _LOG_ERROR ("evtimer_add failed for " << task->tag());
275 }
276 }
277 else
278 {
279 task->setTv (time); // force different time
280 addTask (task, false);
281 }
282}
283
284
285bool
286Scheduler::addToMap(TaskPtr task)
287{
288 ScopedLock lock(m_mutex);
289 if (m_taskMap.find(task->tag()) == m_taskMap.end())
290 {
291 m_taskMap.insert(make_pair(task->tag(), task));
292 return true;
293 }
294 return false;
295}
296
297void
298Scheduler::deleteTask(const Task::Tag &tag)
299{
300 ScopedLock lock(m_mutex);
301 TaskMapIt it = m_taskMap.find(tag);
302 if (it != m_taskMap.end())
303 {
304 TaskPtr task = it->second;
305 evtimer_del(task->ev());
306 m_taskMap.erase(it);
307 }
308}
309
310void
311Scheduler::deleteTask(const Task::TaskMatcher &matcher)
312{
313 ScopedLock lock(m_mutex);
314 TaskMapIt it = m_taskMap.begin();
315 while(it != m_taskMap.end())
316 {
317 TaskPtr task = it->second;
318 if (matcher(task))
319 {
320 evtimer_del(task->ev());
321 // Use post increment; map.erase invalidate the iterator that is beening erased,
322 // but does not invalidate other iterators. This seems to be the convention to
323 // erase something from C++ STL map while traversing.
324 m_taskMap.erase(it++);
325 }
326 else
327 {
328 ++it;
329 }
330 }
331}
332
333int
334Scheduler::size()
335{
336 ScopedLock lock(m_mutex);
337 return m_taskMap.size();
338}