blob: 1c892d64e65dd9aae1ac37f25ac8d76ef140b903 [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#include "event-scheduler.h"
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08002#include <utility>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08003
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08004#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
5
6IntervalGeneratorPtr
7IntervalGenerator:: Null;
8
9void
10eventCallback(evutil_socket_t fd, short what, void *arg)
11{
12 Task *task = static_cast<Task *>(arg);
13 task->run();
14 task = NULL;
15}
16
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080017Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080018 : m_callback(callback)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080019 , m_tag(tag)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080020 , m_scheduler(scheduler)
21 , m_invoked(false)
22 , m_event(NULL)
23 , m_tv(NULL)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080024{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080025 m_event = evtimer_new(scheduler->base(), eventCallback, this);
26 m_tv = new timeval;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080027}
28
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080029Task::~Task()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080030{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080031 if (m_event != NULL)
32 {
33 event_free(m_event);
34 m_event = NULL;
35 }
36
37 if (m_tv != NULL)
38 {
39 delete m_tv;
40 m_tv = NULL;
41 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080042}
43
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080044void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080045Task::setTv(double delay)
46{
47 double intPart, fraction;
48 fraction = modf(abs(delay), &intPart);
49 m_tv->tv_sec = static_cast<int>(intPart);
50 m_tv->tv_usec = static_cast<int>((fraction * 1000000));
51}
52
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080053OneTimeTask::OneTimeTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, double delay)
54 : Task(callback, tag, scheduler)
55{
56 setTv(delay);
57}
58
59void
60OneTimeTask::run()
61{
62 if (!m_invoked)
63 {
64 m_callback();
65 m_invoked = true;
66 deregisterSelf();
67 }
68}
69
70void
71OneTimeTask::deregisterSelf()
72{
73 m_scheduler->deleteTask(m_tag);
74}
75
76void
77OneTimeTask::reset()
78{
79 m_invoked = false;
80}
81
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080082PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator, int repeat)
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080083 : Task(callback, tag, scheduler)
84 , m_generator(generator)
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080085 , m_repeat(repeat)
86 , m_indefinite(m_repeat > 0)
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080087{
88}
89
90void
91PeriodicTask::run()
92{
93 if (!m_invoked)
94 {
95 m_callback();
96 m_invoked = true;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -080097 if (m_indefinite)
98 {
99 m_scheduler->rescheduleTask(m_tag);
100 }
101 else
102 {
103 m_repeat--;
104 if (m_repeat > 0)
105 {
106 m_scheduler->rescheduleTask(m_tag);
107 }
108 else
109 {
110 deregisterSelf();
111 }
112 }
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800113 }
114}
115
116void
117PeriodicTask::reset()
118{
119 m_invoked = false;
120 double interval = m_generator->nextInterval();
121 setTv(interval);
122}
123
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800124void
125PeriodicTask::deregisterSelf()
126{
127 m_scheduler->deleteTask(m_tag);
128}
129
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800130RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800131 : m_interval(interval)
132 , m_rng(time(NULL))
133 , m_percent(percent)
134 , m_dist(0.0, fractional(percent))
135 , m_random(m_rng, m_dist)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800136 , m_direction(direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800137{
138}
139
140double
141RandomIntervalGenerator::nextInterval()
142{
143 double percent = m_random();
144 double interval = m_interval;
145 switch (m_direction)
146 {
147 case UP: interval = m_interval * (1.0 + percent); break;
148 case DOWN: interval = m_interval * (1.0 - percent); break;
149 case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800150 default: break;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800151 }
152
153 return interval;
154}
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800155
156Scheduler::Scheduler()
157{
158 evthread_use_pthreads();
159 m_base = event_base_new();
160}
161
162Scheduler::~Scheduler()
163{
164 event_base_free(m_base);
165}
166
167void
168Scheduler::eventLoop()
169{
170 event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY);
171}
172
173void
174Scheduler::start()
175{
176 m_thread = boost::thread(&Scheduler::eventLoop, this);
177}
178
179void
180Scheduler::shutdown()
181{
182 event_base_loopbreak(m_base);
183 m_thread.join();
184}
185
186bool
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800187Scheduler::addTask(const TaskPtr &task)
188{
189 TaskPtr newTask = task;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800190
191 if (addToMap(newTask))
192 {
193 newTask->reset();
194 evtimer_add(newTask->ev(), newTask->tv());
195 return true;
196 }
197
198 return false;
199}
200
201void
202Scheduler::rescheduleTask(const Task::Tag &tag)
203{
204 ReadLock(m_mutex);
205 TaskMapIt it = m_taskMap.find(tag);
206 if (it != m_taskMap.end())
207 {
208 TaskPtr task = it->second;
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800209 task->reset();
210 evtimer_add(task->ev(), task->tv());
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800211 }
212}
213
214bool
215Scheduler::addToMap(const TaskPtr &task)
216{
217 WriteLock(m_mutex);
218 if (m_taskMap.find(task->tag()) == m_taskMap.end())
219 {
220 m_taskMap.insert(make_pair(task->tag(), task));
221 return true;
222 }
223 return false;
224}
225
226void
227Scheduler::deleteTask(const Task::Tag &tag)
228{
229 WriteLock(m_mutex);
230 TaskMapIt it = m_taskMap.find(tag);
231 if (it != m_taskMap.end())
232 {
233 TaskPtr task = it->second;
234 evtimer_del(task->ev());
235 m_taskMap.erase(it);
236 }
237}
238
239void
240Scheduler::deleteTask(const Task::TaskMatcher &matcher)
241{
242 WriteLock(m_mutex);
243 TaskMapIt it = m_taskMap.begin();
244 while(it != m_taskMap.end())
245 {
246 TaskPtr task = it->second;
247 if (matcher(task))
248 {
249 evtimer_del(task->ev());
250 // Use post increment; map.erase invalidate the iterator that is beening erased,
251 // but does not invalidate other iterators. This seems to be the convention to
252 // erase something from C++ STL map while traversing.
253 m_taskMap.erase(it++);
254 }
255 else
256 {
257 ++it;
258 }
259 }
260}
261
262int
263Scheduler::size()
264{
265 ReadLock(m_mutex);
266 return m_taskMap.size();
267}