blob: ec4b104e4bc9bfa12387b621f30ca745d1145018 [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#include "event-scheduler.h"
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08002#include <utility>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08003
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08004#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
5
6IntervalGeneratorPtr
7IntervalGenerator:: Null;
8
9void
10eventCallback(evutil_socket_t fd, short what, void *arg)
11{
12 Task *task = static_cast<Task *>(arg);
13 task->run();
14 task = NULL;
15}
16
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -080017void errorCallback(int err)
18{
19 cout << "Fatal error: " << err << endl;
20}
21
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080022Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080023 : m_callback(callback)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080024 , m_tag(tag)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080025 , m_scheduler(scheduler)
26 , m_invoked(false)
27 , m_event(NULL)
28 , m_tv(NULL)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080029{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080030 m_event = evtimer_new(scheduler->base(), eventCallback, this);
31 m_tv = new timeval;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080032}
33
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080034Task::~Task()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080035{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080036 if (m_event != NULL)
37 {
38 event_free(m_event);
39 m_event = NULL;
40 }
41
42 if (m_tv != NULL)
43 {
44 delete m_tv;
45 m_tv = NULL;
46 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080047}
48
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080049void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080050Task::setTv(double delay)
51{
52 double intPart, fraction;
53 fraction = modf(abs(delay), &intPart);
54 m_tv->tv_sec = static_cast<int>(intPart);
55 m_tv->tv_usec = static_cast<int>((fraction * 1000000));
56}
57
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080058OneTimeTask::OneTimeTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, double delay)
59 : Task(callback, tag, scheduler)
60{
61 setTv(delay);
62}
63
64void
65OneTimeTask::run()
66{
67 if (!m_invoked)
68 {
69 m_callback();
70 m_invoked = true;
71 deregisterSelf();
72 }
73}
74
75void
76OneTimeTask::deregisterSelf()
77{
78 m_scheduler->deleteTask(m_tag);
79}
80
81void
82OneTimeTask::reset()
83{
84 m_invoked = false;
85}
86
Zhenkai Zhu06b8e952013-01-15 12:21:15 -080087PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080088 : Task(callback, tag, scheduler)
89 , m_generator(generator)
90{
91}
92
93void
94PeriodicTask::run()
95{
96 if (!m_invoked)
97 {
98 m_callback();
99 m_invoked = true;
Zhenkai Zhu06b8e952013-01-15 12:21:15 -0800100 m_scheduler->rescheduleTask(m_tag);
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800101 }
102}
103
104void
105PeriodicTask::reset()
106{
107 m_invoked = false;
108 double interval = m_generator->nextInterval();
109 setTv(interval);
110}
111
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800112RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800113 : m_interval(interval)
114 , m_rng(time(NULL))
115 , m_percent(percent)
116 , m_dist(0.0, fractional(percent))
117 , m_random(m_rng, m_dist)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800118 , m_direction(direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800119{
120}
121
122double
123RandomIntervalGenerator::nextInterval()
124{
125 double percent = m_random();
126 double interval = m_interval;
127 switch (m_direction)
128 {
129 case UP: interval = m_interval * (1.0 + percent); break;
130 case DOWN: interval = m_interval * (1.0 - percent); break;
131 case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800132 default: break;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800133 }
134
135 return interval;
136}
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800137
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800138Scheduler::Scheduler() : m_running(false)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800139{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800140 event_set_fatal_callback(errorCallback);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800141 evthread_use_pthreads();
142 m_base = event_base_new();
143}
144
145Scheduler::~Scheduler()
146{
147 event_base_free(m_base);
148}
149
150void
151Scheduler::eventLoop()
152{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800153 while(true)
154 {
155 if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
156 {
157 cout << "scheduler loop break error" << endl;
158 }
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800159 ReadLock lock(m_mutex);
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800160 if (!m_running)
161 {
162 cout << "scheduler loop break normal" << endl;
163 break;
164 }
165 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800166}
167
168void
169Scheduler::start()
170{
Zhenkai Zhub330aed2013-01-17 13:29:37 -0800171 WriteLock lock(m_mutex);
172 if (!m_running)
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800173 {
Zhenkai Zhub330aed2013-01-17 13:29:37 -0800174 m_thread = boost::thread(&Scheduler::eventLoop, this);
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800175 m_running = true;
176 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800177}
178
179void
180Scheduler::shutdown()
181{
Zhenkai Zhub330aed2013-01-17 13:29:37 -0800182 WriteLock lock(m_mutex);
183 if (m_running)
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800184 {
Zhenkai Zhub330aed2013-01-17 13:29:37 -0800185 event_base_loopbreak(m_base);
186 m_thread.join();
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800187 m_running = false;
188 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800189}
190
191bool
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800192Scheduler::addTask(const TaskPtr &task)
193{
194 TaskPtr newTask = task;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800195
196 if (addToMap(newTask))
197 {
198 newTask->reset();
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800199 int res = evtimer_add(newTask->ev(), newTask->tv());
200 if (res < 0)
201 {
202 cout << "evtimer_add failed for " << task->tag() << endl;
203 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800204 return true;
205 }
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800206 else
207 {
208 cout << "fail to add task: " << task->tag() << endl;
209 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800210
211 return false;
212}
213
214void
215Scheduler::rescheduleTask(const Task::Tag &tag)
216{
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800217 ReadLock lock(m_mutex);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800218 TaskMapIt it = m_taskMap.find(tag);
219 if (it != m_taskMap.end())
220 {
221 TaskPtr task = it->second;
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800222 task->reset();
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800223 int res = evtimer_add(task->ev(), task->tv());
224 if (res < 0)
225 {
226 cout << "evtimer_add failed for " << task->tag() << endl;
227 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800228 }
229}
230
231bool
232Scheduler::addToMap(const TaskPtr &task)
233{
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800234 WriteLock lock(m_mutex);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800235 if (m_taskMap.find(task->tag()) == m_taskMap.end())
236 {
237 m_taskMap.insert(make_pair(task->tag(), task));
238 return true;
239 }
240 return false;
241}
242
243void
244Scheduler::deleteTask(const Task::Tag &tag)
245{
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800246 WriteLock lock(m_mutex);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800247 TaskMapIt it = m_taskMap.find(tag);
248 if (it != m_taskMap.end())
249 {
250 TaskPtr task = it->second;
251 evtimer_del(task->ev());
252 m_taskMap.erase(it);
253 }
254}
255
256void
257Scheduler::deleteTask(const Task::TaskMatcher &matcher)
258{
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800259 WriteLock lock(m_mutex);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800260 TaskMapIt it = m_taskMap.begin();
261 while(it != m_taskMap.end())
262 {
263 TaskPtr task = it->second;
264 if (matcher(task))
265 {
266 evtimer_del(task->ev());
267 // Use post increment; map.erase invalidate the iterator that is beening erased,
268 // but does not invalidate other iterators. This seems to be the convention to
269 // erase something from C++ STL map while traversing.
270 m_taskMap.erase(it++);
271 }
272 else
273 {
274 ++it;
275 }
276 }
277}
278
279int
280Scheduler::size()
281{
Zhenkai Zhu4865bd62013-01-17 12:08:29 -0800282 ReadLock lock(m_mutex);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800283 return m_taskMap.size();
284}