blob: a99dbae7389aba987ec1dab67e62a2a208dc57f8 [file] [log] [blame]
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08001#ifndef EVENT_SCHEDULER_H
2#define EVENT_SCHEDULER_H
Zhenkai Zhubc2f6282013-01-08 16:40:58 -08003
4// use pthread
5
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08006#include <event2/event.h>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -08007#include <event2/thread.h>
8
9#include <boost/function.hpp>
10#include <boost/shared_ptr.hpp>
11#include <boost/random/mersenne_twister.hpp>
12#include <boost/random/uniform_real.hpp>
13#include <boost/random/variate_generator.hpp>
14#include <boost/exception/all.hpp>
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080015#include <boost/thread/shared_mutex.hpp>
16#include <boost/thread/thread.hpp>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080017#include <math.h>
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080018#include <map>
19#include <sys/time.h>
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080020
21#define _OVERRIDE
22#ifdef __GNUC__
23#if __GNUC_MAJOR >= 4 && __GNUC_MINOR__ >= 7
24 #undef _OVERRIDE
25 #define _OVERRIDE override
26#endif // __GNUC__ version
27#endif // __GNUC__
28
29using namespace std;
30
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080031// callback used by libevent
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080032static void
33eventCallback(evutil_socket_t fd, short what, void *arg);
34
35class Scheduler;
36typedef boost::shared_ptr<Scheduler> SchedulerPtr;
37class IntervalGenerator;
38typedef boost::shared_ptr<IntervalGenerator> IntervalGeneratorPtr;
39class Task;
40typedef boost::shared_ptr<Task> TaskPtr;
41
42class IntervalGenerator
43{
44public:
45 virtual double
46 nextInterval() = 0;
47 static IntervalGeneratorPtr Null;
48};
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080049
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080050
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080051class Task
52{
53public:
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080054 // callback of this task
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080055 typedef boost::function<void ()> Callback;
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080056 // tag identifies this task, should be unique
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080057 typedef string Tag;
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080058 // used to match tasks
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080059 typedef boost::function<bool (const TaskPtr &task)> TaskMatcher;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080060
Zhenkai Zhu4eabef12013-01-08 20:29:52 -080061
62 // Task is associated with Schedulers due to the requirement that libevent event is associated with an libevent event_base
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080063 Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler);
64 virtual ~Task();
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080065
66 virtual void
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080067 run() = 0;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080068
69 Tag
70 tag() { return m_tag; }
71
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080072 event *
73 ev() { return m_event; }
74
75 timeval *
76 tv() { return m_tv; }
77
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080078 // Task needs to be resetted after the callback is invoked if it is to be schedule again; just for safety
79 // it's called by scheduler automatically when addTask or rescheduleTask is called;
80 // Tasks should do preparation work here (e.g. set up new delay, etc. )
81 virtual void
82 reset() = 0;
83
84 // set delay
85 // This overrides whatever delay kept in m_tv
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080086 void
87 setTv(double delay);
88
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080089protected:
90 Callback m_callback;
91 Tag m_tag;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -080092 SchedulerPtr m_scheduler;
93 bool m_invoked;
94 event *m_event;
95 timeval *m_tv;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -080096};
97
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -080098class OneTimeTask : public Task
99{
100public:
101 OneTimeTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, double delay);
102 virtual ~OneTimeTask(){}
103
104 // invoke callback and mark self as invoked and deregister self from scheduler
105 virtual void
106 run() _OVERRIDE;
107
108 // after reset, the task is marked as un-invoked and can be add to scheduler again, with same delay
109 // if not invoked yet, no effect
110 virtual void
111 reset() _OVERRIDE;
112
113private:
114 // this is to deregister itself from scheduler automatically after invoke
115 void
116 deregisterSelf();
117};
118
119class PeriodicTask : public Task
120{
121public:
122 // generator is needed only when this is a periodic task
123 // two simple generators implementation (SimpleIntervalGenerator and RandomIntervalGenerator) are provided;
124 // if user needs more complex pattern in the intervals between calls, extend class IntervalGenerator
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800125 PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator, int repeat = -1);
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800126 virtual ~PeriodicTask(){}
127
128 // invoke callback, reset self and ask scheduler to schedule self with the next delay interval
129 virtual void
130 run() _OVERRIDE;
131
132 // set the next delay and mark as un-invoke
133 virtual void
134 reset() _OVERRIDE;
135
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800136 void
137 setRepeatTimes(int repeat) { m_repeat = repeat; m_indefinite = (m_repeat > 0); }
138
139private:
140 // this is to deregister itself from scheduler automatically if not indefinite
141 void
142 deregisterSelf();
143
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800144private:
145 IntervalGeneratorPtr m_generator;
Zhenkai Zhu74dd53c2013-01-10 23:39:57 -0800146 int m_repeat;
147 bool m_indefinite;
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800148};
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800149
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800150struct SchedulerException : virtual boost::exception, virtual exception { };
151
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800152class Scheduler
153{
154public:
155 Scheduler();
156 virtual ~Scheduler();
157
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800158 // start event scheduling
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800159 virtual void
160 start();
161
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800162 // stop event scheduling
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800163 virtual void
164 shutdown();
165
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800166 // if task with the same tag exists, the task is not added and return false
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800167 virtual bool
168 addTask(const TaskPtr &task);
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800169
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800170 // delete task by tag, regardless of whether it's invoked or not
171 // if no task is found, no effect
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800172 virtual void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800173 deleteTask(const Task::Tag &tag);
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800174
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800175 // delete tasks by matcher, regardless of whether it's invoked or not
176 // this is flexiable in that you can use any form of criteria in finding tasks to delete
177 // but keep in mind this is a linear scan
178
179 // if no task is found, no effect
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800180 virtual void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800181 deleteTask(const Task::TaskMatcher &matcher);
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800182
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800183 // task must already have been added to the scheduler, otherwise this is no effect
Zhenkai Zhu66dc5a92013-01-08 21:41:15 -0800184 // this is usually used by PeriodicTask
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800185 virtual void
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800186 rescheduleTask(const Task::Tag &tag);
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800187
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800188 void
189 eventLoop();
190
191 event_base *
192 base() { return m_base; }
193
194 // used in test
195 int
196 size();
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800197
198protected:
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800199 bool
200 addToMap(const TaskPtr &task);
201
202protected:
203 typedef map<Task::Tag, TaskPtr> TaskMap;
204 typedef map<Task::Tag, TaskPtr>::iterator TaskMapIt;
205 typedef boost::shared_mutex Mutex;
206 typedef boost::unique_lock<Mutex> WriteLock;
207 typedef boost::shared_lock<Mutex> ReadLock;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800208 TaskMap m_taskMap;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800209 Mutex m_mutex;
210 event_base *m_base;
211 boost::thread m_thread;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800212};
213
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800214
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800215class SimpleIntervalGenerator : public IntervalGenerator
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800216{
217public:
218 SimpleIntervalGenerator(double interval) : m_interval(interval) {}
219 ~SimpleIntervalGenerator(){}
220 virtual double
221 nextInterval() _OVERRIDE { return m_interval; }
222private:
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800223 double m_interval;
224};
225
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800226// generates intervals with uniform distribution
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800227class RandomIntervalGenerator : public IntervalGenerator
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800228{
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800229public:
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800230 typedef enum
231 {
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800232 UP = 1,
233 DOWN = 2,
234 EVEN = 3
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800235 } Direction;
236
237public:
Zhenkai Zhu4eabef12013-01-08 20:29:52 -0800238 // percent is random-range/interval; e.g. if interval is 10 and you wish the random-range to be 2
239 // e.g. 9 ~ 11, percent = 0.2
240 // direction shifts the random range; e.g. in the above example, UP would produce a range of
241 // 10 ~ 12, DOWN of 8 ~ 10, and EVEN of 9 ~ 11
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800242 RandomIntervalGenerator(double interval, double percent, Direction direction = EVEN);
243 ~RandomIntervalGenerator(){}
244 virtual double
245 nextInterval() _OVERRIDE;
246
247private:
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800248 inline double fractional(double x) { double dummy; return abs(modf(x, &dummy)); }
249
250private:
251 typedef boost::mt19937 RNG_TYPE;
252 RNG_TYPE m_rng;
253 boost::uniform_real<> m_dist;
Zhenkai Zhubc2f6282013-01-08 16:40:58 -0800254 boost::variate_generator<RNG_TYPE &, boost::uniform_real<> > m_random;
Zhenkai Zhu97019eb2013-01-08 00:21:43 -0800255 Direction m_direction;
256 double m_interval;
257 double m_percent;
258
259};
260#endif // EVENT_SCHEDULER_H