blob: 33697a5996ba8e05a918e7feb53267beb59f8dbc [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#include "event-scheduler.h"
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08002#include <utility>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08003
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08004#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
5
6IntervalGeneratorPtr
7IntervalGenerator:: Null;
8
9void
10eventCallback(evutil_socket_t fd, short what, void *arg)
11{
12 Task *task = static_cast<Task *>(arg);
13 task->run();
14 task = NULL;
15}
16
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -080017void errorCallback(int err)
18{
19 cout << "Fatal error: " << err << endl;
20}
21
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080022Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080023 : m_callback(callback)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080024 , m_tag(tag)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080025 , m_scheduler(scheduler)
26 , m_invoked(false)
27 , m_event(NULL)
28 , m_tv(NULL)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080029{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080030 m_event = evtimer_new(scheduler->base(), eventCallback, this);
31 m_tv = new timeval;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080032}
33
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080034Task::~Task()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080035{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080036 if (m_event != NULL)
37 {
38 event_free(m_event);
39 m_event = NULL;
40 }
41
42 if (m_tv != NULL)
43 {
44 delete m_tv;
45 m_tv = NULL;
46 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080047}
48
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080049void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080050Task::setTv(double delay)
51{
52 double intPart, fraction;
53 fraction = modf(abs(delay), &intPart);
54 m_tv->tv_sec = static_cast<int>(intPart);
55 m_tv->tv_usec = static_cast<int>((fraction * 1000000));
56}
57
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080058OneTimeTask::OneTimeTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, double delay)
59 : Task(callback, tag, scheduler)
60{
61 setTv(delay);
62}
63
64void
65OneTimeTask::run()
66{
67 if (!m_invoked)
68 {
69 m_callback();
70 m_invoked = true;
71 deregisterSelf();
72 }
73}
74
75void
76OneTimeTask::deregisterSelf()
77{
78 m_scheduler->deleteTask(m_tag);
79}
80
81void
82OneTimeTask::reset()
83{
84 m_invoked = false;
85}
86
Zhenkai Zhu06b8e952013-01-15 12:21:15 -080087PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080088 : Task(callback, tag, scheduler)
89 , m_generator(generator)
90{
91}
92
93void
94PeriodicTask::run()
95{
96 if (!m_invoked)
97 {
98 m_callback();
99 m_invoked = true;
Zhenkai Zhu06b8e952013-01-15 12:21:15 -0800100 m_scheduler->rescheduleTask(m_tag);
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800101 }
102}
103
104void
105PeriodicTask::reset()
106{
107 m_invoked = false;
108 double interval = m_generator->nextInterval();
109 setTv(interval);
110}
111
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800112RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800113 : m_interval(interval)
114 , m_rng(time(NULL))
115 , m_percent(percent)
116 , m_dist(0.0, fractional(percent))
117 , m_random(m_rng, m_dist)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800118 , m_direction(direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800119{
120}
121
122double
123RandomIntervalGenerator::nextInterval()
124{
125 double percent = m_random();
126 double interval = m_interval;
127 switch (m_direction)
128 {
129 case UP: interval = m_interval * (1.0 + percent); break;
130 case DOWN: interval = m_interval * (1.0 - percent); break;
131 case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800132 default: break;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800133 }
134
135 return interval;
136}
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800137
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800138Scheduler::Scheduler() : m_running(false)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800139{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800140 event_set_fatal_callback(errorCallback);
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800141 evthread_use_pthreads();
142 m_base = event_base_new();
143}
144
145Scheduler::~Scheduler()
146{
147 event_base_free(m_base);
148}
149
150void
151Scheduler::eventLoop()
152{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800153 while(true)
154 {
155 if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
156 {
157 cout << "scheduler loop break error" << endl;
158 }
159 ReadLock(m_mutex);
160 if (!m_running)
161 {
162 cout << "scheduler loop break normal" << endl;
163 break;
164 }
165 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800166}
167
168void
169Scheduler::start()
170{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800171 {
172 WriteLock(m_mutex);
173 m_running = true;
174 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800175 m_thread = boost::thread(&Scheduler::eventLoop, this);
176}
177
178void
179Scheduler::shutdown()
180{
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800181 {
182 WriteLock(m_mutex);
183 m_running = false;
184 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800185 event_base_loopbreak(m_base);
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800186 cout << "shutdown, calling loop break" << endl;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800187 m_thread.join();
188}
189
190bool
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800191Scheduler::addTask(const TaskPtr &task)
192{
193 TaskPtr newTask = task;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800194
195 if (addToMap(newTask))
196 {
197 newTask->reset();
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800198 int res = evtimer_add(newTask->ev(), newTask->tv());
199 if (res < 0)
200 {
201 cout << "evtimer_add failed for " << task->tag() << endl;
202 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800203 return true;
204 }
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800205 else
206 {
207 cout << "fail to add task: " << task->tag() << endl;
208 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800209
210 return false;
211}
212
213void
214Scheduler::rescheduleTask(const Task::Tag &tag)
215{
216 ReadLock(m_mutex);
217 TaskMapIt it = m_taskMap.find(tag);
218 if (it != m_taskMap.end())
219 {
220 TaskPtr task = it->second;
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800221 task->reset();
Zhenkai Zhuf8e81e02013-01-15 16:02:47 -0800222 int res = evtimer_add(task->ev(), task->tv());
223 if (res < 0)
224 {
225 cout << "evtimer_add failed for " << task->tag() << endl;
226 }
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800227 }
228}
229
230bool
231Scheduler::addToMap(const TaskPtr &task)
232{
233 WriteLock(m_mutex);
234 if (m_taskMap.find(task->tag()) == m_taskMap.end())
235 {
236 m_taskMap.insert(make_pair(task->tag(), task));
237 return true;
238 }
239 return false;
240}
241
242void
243Scheduler::deleteTask(const Task::Tag &tag)
244{
245 WriteLock(m_mutex);
246 TaskMapIt it = m_taskMap.find(tag);
247 if (it != m_taskMap.end())
248 {
249 TaskPtr task = it->second;
250 evtimer_del(task->ev());
251 m_taskMap.erase(it);
252 }
253}
254
255void
256Scheduler::deleteTask(const Task::TaskMatcher &matcher)
257{
258 WriteLock(m_mutex);
259 TaskMapIt it = m_taskMap.begin();
260 while(it != m_taskMap.end())
261 {
262 TaskPtr task = it->second;
263 if (matcher(task))
264 {
265 evtimer_del(task->ev());
266 // Use post increment; map.erase invalidate the iterator that is beening erased,
267 // but does not invalidate other iterators. This seems to be the convention to
268 // erase something from C++ STL map while traversing.
269 m_taskMap.erase(it++);
270 }
271 else
272 {
273 ++it;
274 }
275 }
276}
277
278int
279Scheduler::size()
280{
281 ReadLock(m_mutex);
282 return m_taskMap.size();
283}