blob: 77b86d26f2df2d54940804c8e72f9032670c491d [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#include "event-scheduler.h"
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08002#include <utility>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08003
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08004#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
5
6IntervalGeneratorPtr
7IntervalGenerator:: Null;
8
9void
10eventCallback(evutil_socket_t fd, short what, void *arg)
11{
12 Task *task = static_cast<Task *>(arg);
13 task->run();
14 task = NULL;
15}
16
17Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080018 : m_callback(callback)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080019 , m_tag(tag)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080020 , m_scheduler(scheduler)
21 , m_invoked(false)
22 , m_event(NULL)
23 , m_tv(NULL)
24 , m_generator(generator)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080025{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080026 m_event = evtimer_new(scheduler->base(), eventCallback, this);
27 m_tv = new timeval;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080028}
29
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080030Task::~Task()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080031{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080032 if (m_event != NULL)
33 {
34 event_free(m_event);
35 m_event = NULL;
36 }
37
38 if (m_tv != NULL)
39 {
40 delete m_tv;
41 m_tv = NULL;
42 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080043}
44
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080045void
46Task::run()
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080047{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080048 if (!m_invoked)
49 {
50 m_callback();
51 m_invoked = true;
52 }
53 if (isPeriodic())
54 {
55 reset();
56 m_scheduler->rescheduleTask(m_tag);
57 }
58 else
59 {
60 selfClean();
61 }
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080062}
63
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080064void
65Task::selfClean()
66{
67 m_scheduler->deleteTask(m_tag);
68}
69
70void
71Task::reset()
72{
73 m_invoked = false;
74 if (isPeriodic())
75 {
76 double interval = m_generator->nextInterval();
77 setTv(interval);
78 }
79}
80
81void
82Task::setTv(double delay)
83{
84 double intPart, fraction;
85 fraction = modf(abs(delay), &intPart);
86 m_tv->tv_sec = static_cast<int>(intPart);
87 m_tv->tv_usec = static_cast<int>((fraction * 1000000));
88}
89
90RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080091 : m_interval(interval)
92 , m_rng(time(NULL))
93 , m_percent(percent)
94 , m_dist(0.0, fractional(percent))
95 , m_random(m_rng, m_dist)
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080096 , m_direction(direction)
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080097{
98}
99
100double
101RandomIntervalGenerator::nextInterval()
102{
103 double percent = m_random();
104 double interval = m_interval;
105 switch (m_direction)
106 {
107 case UP: interval = m_interval * (1.0 + percent); break;
108 case DOWN: interval = m_interval * (1.0 - percent); break;
109 case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800110 default: break;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800111 }
112
113 return interval;
114}
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800115
116Scheduler::Scheduler()
117{
118 evthread_use_pthreads();
119 m_base = event_base_new();
120}
121
122Scheduler::~Scheduler()
123{
124 event_base_free(m_base);
125}
126
127void
128Scheduler::eventLoop()
129{
130 event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY);
131}
132
133void
134Scheduler::start()
135{
136 m_thread = boost::thread(&Scheduler::eventLoop, this);
137}
138
139void
140Scheduler::shutdown()
141{
142 event_base_loopbreak(m_base);
143 m_thread.join();
144}
145
146bool
147Scheduler::addTask(const TaskPtr &task, double delay)
148{
149 TaskPtr newTask = task;
150 newTask->setTv(delay);
151 if (addToMap(newTask))
152 {
153 evtimer_add(newTask->ev(), newTask->tv());
154 return true;
155 }
156 return false;
157}
158
159bool
160Scheduler::addTask(const TaskPtr &task)
161{
162 TaskPtr newTask = task;
163 if (!newTask->isPeriodic())
164 {
165 return false;
166 }
167
168 if (addToMap(newTask))
169 {
170 newTask->reset();
171 evtimer_add(newTask->ev(), newTask->tv());
172 return true;
173 }
174
175 return false;
176}
177
178void
179Scheduler::rescheduleTask(const Task::Tag &tag)
180{
181 ReadLock(m_mutex);
182 TaskMapIt it = m_taskMap.find(tag);
183 if (it != m_taskMap.end())
184 {
185 TaskPtr task = it->second;
186 if (task->isPeriodic())
187 {
188 evtimer_add(task->ev(), task->tv());
189 }
190 }
191}
192
193bool
194Scheduler::addToMap(const TaskPtr &task)
195{
196 WriteLock(m_mutex);
197 if (m_taskMap.find(task->tag()) == m_taskMap.end())
198 {
199 m_taskMap.insert(make_pair(task->tag(), task));
200 return true;
201 }
202 return false;
203}
204
205void
206Scheduler::deleteTask(const Task::Tag &tag)
207{
208 WriteLock(m_mutex);
209 TaskMapIt it = m_taskMap.find(tag);
210 if (it != m_taskMap.end())
211 {
212 TaskPtr task = it->second;
213 evtimer_del(task->ev());
214 m_taskMap.erase(it);
215 }
216}
217
218void
219Scheduler::deleteTask(const Task::TaskMatcher &matcher)
220{
221 WriteLock(m_mutex);
222 TaskMapIt it = m_taskMap.begin();
223 while(it != m_taskMap.end())
224 {
225 TaskPtr task = it->second;
226 if (matcher(task))
227 {
228 evtimer_del(task->ev());
229 // Use post increment; map.erase invalidate the iterator that is beening erased,
230 // but does not invalidate other iterators. This seems to be the convention to
231 // erase something from C++ STL map while traversing.
232 m_taskMap.erase(it++);
233 }
234 else
235 {
236 ++it;
237 }
238 }
239}
240
241int
242Scheduler::size()
243{
244 ReadLock(m_mutex);
245 return m_taskMap.size();
246}