blob: be67aad34505652eca4df64fbace1f6d16256346 [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#include "event-scheduler.h"
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08002#include <utility>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08003
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08004#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
5
6IntervalGeneratorPtr
7IntervalGenerator:: Null;
8
9void
10eventCallback(evutil_socket_t fd, short what, void *arg)
11{
12 Task *task = static_cast<Task *>(arg);
13 task->run();
14 task = NULL;
15}
16
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080017Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080018 : m_callback(callback)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080019 , m_tag(tag)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080020 , m_scheduler(scheduler)
21 , m_invoked(false)
22 , m_event(NULL)
23 , m_tv(NULL)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080024{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080025 m_event = evtimer_new(scheduler->base(), eventCallback, this);
26 m_tv = new timeval;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080027}
28
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080029Task::~Task()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080030{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080031 if (m_event != NULL)
32 {
33 event_free(m_event);
34 m_event = NULL;
35 }
36
37 if (m_tv != NULL)
38 {
39 delete m_tv;
40 m_tv = NULL;
41 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080042}
43
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080044void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080045Task::setTv(double delay)
46{
47 double intPart, fraction;
48 fraction = modf(abs(delay), &intPart);
49 m_tv->tv_sec = static_cast<int>(intPart);
50 m_tv->tv_usec = static_cast<int>((fraction * 1000000));
51}
52
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080053OneTimeTask::OneTimeTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, double delay)
54 : Task(callback, tag, scheduler)
55{
56 setTv(delay);
57}
58
59void
60OneTimeTask::run()
61{
62 if (!m_invoked)
63 {
64 m_callback();
65 m_invoked = true;
66 deregisterSelf();
67 }
68}
69
70void
71OneTimeTask::deregisterSelf()
72{
73 m_scheduler->deleteTask(m_tag);
74}
75
76void
77OneTimeTask::reset()
78{
79 m_invoked = false;
80}
81
Zhenkai Zhu06b8e952013-01-15 12:21:15 -080082PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080083 : Task(callback, tag, scheduler)
84 , m_generator(generator)
85{
86}
87
88void
89PeriodicTask::run()
90{
91 if (!m_invoked)
92 {
93 m_callback();
94 m_invoked = true;
Zhenkai Zhu06b8e952013-01-15 12:21:15 -080095 m_scheduler->rescheduleTask(m_tag);
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080096 }
97}
98
99void
100PeriodicTask::reset()
101{
102 m_invoked = false;
103 double interval = m_generator->nextInterval();
104 setTv(interval);
105}
106
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800107RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800108 : m_interval(interval)
109 , m_rng(time(NULL))
110 , m_percent(percent)
111 , m_dist(0.0, fractional(percent))
112 , m_random(m_rng, m_dist)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800113 , m_direction(direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800114{
115}
116
117double
118RandomIntervalGenerator::nextInterval()
119{
120 double percent = m_random();
121 double interval = m_interval;
122 switch (m_direction)
123 {
124 case UP: interval = m_interval * (1.0 + percent); break;
125 case DOWN: interval = m_interval * (1.0 - percent); break;
126 case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800127 default: break;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800128 }
129
130 return interval;
131}
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800132
133Scheduler::Scheduler()
134{
135 evthread_use_pthreads();
136 m_base = event_base_new();
137}
138
139Scheduler::~Scheduler()
140{
141 event_base_free(m_base);
142}
143
144void
145Scheduler::eventLoop()
146{
147 event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY);
148}
149
150void
151Scheduler::start()
152{
153 m_thread = boost::thread(&Scheduler::eventLoop, this);
154}
155
156void
157Scheduler::shutdown()
158{
159 event_base_loopbreak(m_base);
160 m_thread.join();
161}
162
163bool
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800164Scheduler::addTask(const TaskPtr &task)
165{
166 TaskPtr newTask = task;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800167
168 if (addToMap(newTask))
169 {
170 newTask->reset();
171 evtimer_add(newTask->ev(), newTask->tv());
172 return true;
173 }
174
175 return false;
176}
177
178void
179Scheduler::rescheduleTask(const Task::Tag &tag)
180{
181 ReadLock(m_mutex);
182 TaskMapIt it = m_taskMap.find(tag);
183 if (it != m_taskMap.end())
184 {
185 TaskPtr task = it->second;
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800186 task->reset();
187 evtimer_add(task->ev(), task->tv());
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800188 }
189}
190
191bool
192Scheduler::addToMap(const TaskPtr &task)
193{
194 WriteLock(m_mutex);
195 if (m_taskMap.find(task->tag()) == m_taskMap.end())
196 {
197 m_taskMap.insert(make_pair(task->tag(), task));
198 return true;
199 }
200 return false;
201}
202
203void
204Scheduler::deleteTask(const Task::Tag &tag)
205{
206 WriteLock(m_mutex);
207 TaskMapIt it = m_taskMap.find(tag);
208 if (it != m_taskMap.end())
209 {
210 TaskPtr task = it->second;
211 evtimer_del(task->ev());
212 m_taskMap.erase(it);
213 }
214}
215
216void
217Scheduler::deleteTask(const Task::TaskMatcher &matcher)
218{
219 WriteLock(m_mutex);
220 TaskMapIt it = m_taskMap.begin();
221 while(it != m_taskMap.end())
222 {
223 TaskPtr task = it->second;
224 if (matcher(task))
225 {
226 evtimer_del(task->ev());
227 // Use post increment; map.erase invalidate the iterator that is beening erased,
228 // but does not invalidate other iterators. This seems to be the convention to
229 // erase something from C++ STL map while traversing.
230 m_taskMap.erase(it++);
231 }
232 else
233 {
234 ++it;
235 }
236 }
237}
238
239int
240Scheduler::size()
241{
242 ReadLock(m_mutex);
243 return m_taskMap.size();
244}