Merge remote-tracking branch 'origin/master'
Conflicts:
wscript
diff --git a/include/event-scheduler.h b/include/event-scheduler.h
index 72169d8..b2aa929 100644
--- a/include/event-scheduler.h
+++ b/include/event-scheduler.h
@@ -1,7 +1,9 @@
#ifndef EVENT_SCHEDULER_H
#define EVENT_SCHEDULER_H
+
+// use pthread
+
#include <event2/event.h>
-#include <event2/event_struct.h>
#include <event2/thread.h>
#include <boost/function.hpp>
@@ -10,8 +12,11 @@
#include <boost/random/uniform_real.hpp>
#include <boost/random/variate_generator.hpp>
#include <boost/exception/all.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/thread/thread.hpp>
#include <math.h>
-#include <multimap>
+#include <map>
+#include <sys/time.h>
#define _OVERRIDE
#ifdef __GNUC__
@@ -23,36 +28,72 @@
using namespace std;
+static void
+eventCallback(evutil_socket_t fd, short what, void *arg);
+
+class Scheduler;
+typedef boost::shared_ptr<Scheduler> SchedulerPtr;
+class IntervalGenerator;
+typedef boost::shared_ptr<IntervalGenerator> IntervalGeneratorPtr;
+class Task;
+typedef boost::shared_ptr<Task> TaskPtr;
+
+class IntervalGenerator
+{
+public:
+ virtual double
+ nextInterval() = 0;
+ static IntervalGeneratorPtr Null;
+};
class Task
{
public:
- typedef boost::function<void (void *)> Callback;
+ typedef boost::function<void ()> Callback;
typedef string Tag;
- typedef boost::function<bool (const Task &task)> TaskMatcher;
+ typedef boost::function<bool (const TaskPtr &task)> TaskMatcher;
- Task(const Callback &callback, void *arg, const Tag &tag);
- Task(const Task &other);
- Task &
- operator=(const Task &other);
+ Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator = IntervalGenerator::Null);
+ ~Task();
virtual void
- run() { m_callback(m_arg); }
+ run();
Tag
tag() { return m_tag; }
+ event *
+ ev() { return m_event; }
+
+ timeval *
+ tv() { return m_tv; }
+
+ void
+ setTv(double delay);
+
+ bool
+ isPeriodic() { return m_generator != IntervalGenerator::Null; }
+
+ void
+ reset();
+
+protected:
+ void
+ selfClean();
protected:
Callback m_callback;
Tag m_tag;
- void *arg;
+ SchedulerPtr m_scheduler;
+ bool m_invoked;
+ event *m_event;
+ timeval *m_tv;
+ IntervalGeneratorPtr m_generator;
};
+
struct SchedulerException : virtual boost::exception, virtual exception { };
-class IntervalGenerator;
-typedef boost::shared_ptr<IntervalGenerator> IntervalGeneratorPtr;
class Scheduler
{
public:
@@ -65,34 +106,48 @@
virtual void
shutdown();
- virtual void
- addTask(const Task &task, double delay);
+ virtual bool
+ addTask(const TaskPtr &task, double delay);
+
+ virtual bool
+ addTask(const TaskPtr &task);
virtual void
- addTimeoutTask(const Task &task, double timeout);
+ deleteTask(const Task::Tag &tag);
virtual void
- addPeriodicTask(const Task &task, const IntervalGeneratorPtr &generator);
+ deleteTask(const Task::TaskMatcher &matcher);
virtual void
- deleteTask(const Tag &tag);
+ rescheduleTask(const Task::Tag &tag);
- virtual void
- deleteTask(const TaskMatcher &matcher);
+ void
+ eventLoop();
+
+ event_base *
+ base() { return m_base; }
+
+ // used in test
+ int
+ size();
protected:
- typedef multimap<Tag, Task> TaskMap;
+ bool
+ addToMap(const TaskPtr &task);
+
+protected:
+ typedef map<Task::Tag, TaskPtr> TaskMap;
+ typedef map<Task::Tag, TaskPtr>::iterator TaskMapIt;
+ typedef boost::shared_mutex Mutex;
+ typedef boost::unique_lock<Mutex> WriteLock;
+ typedef boost::shared_lock<Mutex> ReadLock;
TaskMap m_taskMap;
+ Mutex m_mutex;
+ event_base *m_base;
+ boost::thread m_thread;
};
-class IntervalGenerator
-{
-public:
- virtual double
- nextInterval() = 0;
-};
-
-class SimpleIntervalGenerator : IntervalGenerator
+class SimpleIntervalGenerator : public IntervalGenerator
{
public:
SimpleIntervalGenerator(double interval) : m_interval(interval) {}
@@ -100,18 +155,17 @@
virtual double
nextInterval() _OVERRIDE { return m_interval; }
private:
- SimpleIntervalGenerator(const SimpleIntervalGenerator &other){};
-private:
double m_interval;
};
-class RandomIntervalGenerator : IntervalGenerator
+class RandomIntervalGenerator : public IntervalGenerator
{
+public:
typedef enum
{
- UP,
- DOWN,
- EVEN
+ UP = 1,
+ DOWN = 2,
+ EVEN = 3
} Direction;
public:
@@ -121,14 +175,13 @@
nextInterval() _OVERRIDE;
private:
- RandomIntervalGenerator(const RandomIntervalGenerator &other){};
inline double fractional(double x) { double dummy; return abs(modf(x, &dummy)); }
private:
typedef boost::mt19937 RNG_TYPE;
RNG_TYPE m_rng;
boost::uniform_real<> m_dist;
- boost::rariate_generator<RNG_TYPE &, boost::uniform_real<> > m_random;
+ boost::variate_generator<RNG_TYPE &, boost::uniform_real<> > m_random;
Direction m_direction;
double m_interval;
double m_percent;
diff --git a/src/event-scheduler.cpp b/src/event-scheduler.cpp
index b823255..77b86d2 100644
--- a/src/event-scheduler.cpp
+++ b/src/event-scheduler.cpp
@@ -1,34 +1,99 @@
#include "event-scheduler.h"
+#include <utility>
-Task::Task(const Callback &callback, void *arg, const Tag &tag)
+#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
+
+IntervalGeneratorPtr
+IntervalGenerator:: Null;
+
+void
+eventCallback(evutil_socket_t fd, short what, void *arg)
+{
+ Task *task = static_cast<Task *>(arg);
+ task->run();
+ task = NULL;
+}
+
+Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
: m_callback(callback)
- , m_arg(arg)
, m_tag(tag)
+ , m_scheduler(scheduler)
+ , m_invoked(false)
+ , m_event(NULL)
+ , m_tv(NULL)
+ , m_generator(generator)
{
+ m_event = evtimer_new(scheduler->base(), eventCallback, this);
+ m_tv = new timeval;
}
-Task::Task(const Task &other)
- : m_callback(other.m_callback)
- , m_arg(other.m_arg)
- , m_tag(other.m_tag)
+Task::~Task()
{
+ if (m_event != NULL)
+ {
+ event_free(m_event);
+ m_event = NULL;
+ }
+
+ if (m_tv != NULL)
+ {
+ delete m_tv;
+ m_tv = NULL;
+ }
}
-Task &
-Task::operator=(const Task &other)
+void
+Task::run()
{
- m_callback = other.m_callback;
- m_arg = other.m_arg;
- m_tag = other.m_tag;
- return (*this);
+ if (!m_invoked)
+ {
+ m_callback();
+ m_invoked = true;
+ }
+ if (isPeriodic())
+ {
+ reset();
+ m_scheduler->rescheduleTask(m_tag);
+ }
+ else
+ {
+ selfClean();
+ }
}
-RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction = UP)
+void
+Task::selfClean()
+{
+ m_scheduler->deleteTask(m_tag);
+}
+
+void
+Task::reset()
+{
+ m_invoked = false;
+ if (isPeriodic())
+ {
+ double interval = m_generator->nextInterval();
+ setTv(interval);
+ }
+}
+
+void
+Task::setTv(double delay)
+{
+ double intPart, fraction;
+ fraction = modf(abs(delay), &intPart);
+ m_tv->tv_sec = static_cast<int>(intPart);
+ m_tv->tv_usec = static_cast<int>((fraction * 1000000));
+}
+
+RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
: m_interval(interval)
, m_rng(time(NULL))
, m_percent(percent)
, m_dist(0.0, fractional(percent))
, m_random(m_rng, m_dist)
+ , m_direction(direction)
{
}
@@ -42,8 +107,140 @@
case UP: interval = m_interval * (1.0 + percent); break;
case DOWN: interval = m_interval * (1.0 - percent); break;
case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
- default: break
+ default: break;
}
return interval;
}
+
+Scheduler::Scheduler()
+{
+ evthread_use_pthreads();
+ m_base = event_base_new();
+}
+
+Scheduler::~Scheduler()
+{
+ event_base_free(m_base);
+}
+
+void
+Scheduler::eventLoop()
+{
+ event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY);
+}
+
+void
+Scheduler::start()
+{
+ m_thread = boost::thread(&Scheduler::eventLoop, this);
+}
+
+void
+Scheduler::shutdown()
+{
+ event_base_loopbreak(m_base);
+ m_thread.join();
+}
+
+bool
+Scheduler::addTask(const TaskPtr &task, double delay)
+{
+ TaskPtr newTask = task;
+ newTask->setTv(delay);
+ if (addToMap(newTask))
+ {
+ evtimer_add(newTask->ev(), newTask->tv());
+ return true;
+ }
+ return false;
+}
+
+bool
+Scheduler::addTask(const TaskPtr &task)
+{
+ TaskPtr newTask = task;
+ if (!newTask->isPeriodic())
+ {
+ return false;
+ }
+
+ if (addToMap(newTask))
+ {
+ newTask->reset();
+ evtimer_add(newTask->ev(), newTask->tv());
+ return true;
+ }
+
+ return false;
+}
+
+void
+Scheduler::rescheduleTask(const Task::Tag &tag)
+{
+ ReadLock(m_mutex);
+ TaskMapIt it = m_taskMap.find(tag);
+ if (it != m_taskMap.end())
+ {
+ TaskPtr task = it->second;
+ if (task->isPeriodic())
+ {
+ evtimer_add(task->ev(), task->tv());
+ }
+ }
+}
+
+bool
+Scheduler::addToMap(const TaskPtr &task)
+{
+ WriteLock(m_mutex);
+ if (m_taskMap.find(task->tag()) == m_taskMap.end())
+ {
+ m_taskMap.insert(make_pair(task->tag(), task));
+ return true;
+ }
+ return false;
+}
+
+void
+Scheduler::deleteTask(const Task::Tag &tag)
+{
+ WriteLock(m_mutex);
+ TaskMapIt it = m_taskMap.find(tag);
+ if (it != m_taskMap.end())
+ {
+ TaskPtr task = it->second;
+ evtimer_del(task->ev());
+ m_taskMap.erase(it);
+ }
+}
+
+void
+Scheduler::deleteTask(const Task::TaskMatcher &matcher)
+{
+ WriteLock(m_mutex);
+ TaskMapIt it = m_taskMap.begin();
+ while(it != m_taskMap.end())
+ {
+ TaskPtr task = it->second;
+ if (matcher(task))
+ {
+ evtimer_del(task->ev());
+ // Use post increment; map.erase invalidate the iterator that is beening erased,
+ // but does not invalidate other iterators. This seems to be the convention to
+ // erase something from C++ STL map while traversing.
+ m_taskMap.erase(it++);
+ }
+ else
+ {
+ ++it;
+ }
+ }
+}
+
+int
+Scheduler::size()
+{
+ ReadLock(m_mutex);
+ return m_taskMap.size();
+}
diff --git a/test/test-event-scheduler.cc b/test/test-event-scheduler.cc
new file mode 100644
index 0000000..950a69e
--- /dev/null
+++ b/test/test-event-scheduler.cc
@@ -0,0 +1,138 @@
+#include "event-scheduler.h"
+
+#include <boost/test/unit_test.hpp>
+#include <map>
+#include <unistd.h>
+
+using namespace boost;
+using namespace std;
+
+BOOST_AUTO_TEST_SUITE(SchedulerTests)
+
+map<string, int> table;
+
+void func(string str)
+{
+ map<string, int>::iterator it = table.find(str);
+ if (it == table.end())
+ {
+ table.insert(make_pair(str, 1));
+ }
+ else
+ {
+ int count = it->second;
+ count++;
+ table.erase(it);
+ table.insert(make_pair(str, count));
+ }
+}
+
+bool
+matcher(const TaskPtr &task)
+{
+ return task->tag() == "period" || task->tag() == "world";
+}
+
+BOOST_AUTO_TEST_CASE(SchedulerTest)
+{
+ SchedulerPtr scheduler(new Scheduler());
+ IntervalGeneratorPtr generator(new SimpleIntervalGenerator(0.2));
+
+ string tag1 = "hello";
+ string tag2 = "world";
+ string tag3 = "period";
+
+ TaskPtr task1(new Task(boost::bind(func, tag1), tag1, scheduler));
+ TaskPtr task2(new Task(boost::bind(func, tag2), tag2, scheduler));
+ TaskPtr task3(new Task(boost::bind(func, tag3), tag3, scheduler, generator));
+
+ scheduler->start();
+ scheduler->addTask(task1, 0.5);
+ scheduler->addTask(task2, 0.5);
+ scheduler->addTask(task3);
+ BOOST_CHECK_EQUAL(scheduler->size(), 3);
+ usleep(600000);
+ BOOST_CHECK_EQUAL(scheduler->size(), 1);
+ task1->reset();
+ scheduler->addTask(task1, 0.5);
+ BOOST_CHECK_EQUAL(scheduler->size(), 2);
+ usleep(600000);
+ task1->reset();
+ scheduler->addTask(task1, 0.5);
+ BOOST_CHECK_EQUAL(scheduler->size(), 2);
+ usleep(400000);
+ scheduler->deleteTask(task1->tag());
+ BOOST_CHECK_EQUAL(scheduler->size(), 1);
+ usleep(200000);
+
+ task1->reset();
+ task2->reset();
+ scheduler->addTask(task1, 0.5);
+ scheduler->addTask(task2, 0.5);
+ BOOST_CHECK_EQUAL(scheduler->size(), 3);
+ usleep(100000);
+ scheduler->deleteTask(bind(matcher, _1));
+ BOOST_CHECK_EQUAL(scheduler->size(), 1);
+ usleep(1000000);
+
+ scheduler->shutdown();
+
+ int hello = 0, world = 0, period = 0;
+
+ map<string, int>::iterator it;
+ it = table.find(tag1);
+ if (it != table.end())
+ {
+ hello = it->second;
+ }
+ it = table.find(tag2);
+ if (it != table.end())
+ {
+ world = it->second;
+ }
+ it = table.find(tag3);
+ if (it != table.end())
+ {
+ period = it->second;
+ }
+
+ // added four times, canceled once before invoking callback
+ BOOST_CHECK_EQUAL(hello, 3);
+ // added two times, canceled once by matcher before invoking callback
+ BOOST_CHECK_EQUAL(world, 1);
+ // invoked every 0.2 seconds before deleted by matcher
+ BOOST_CHECK_EQUAL(period, static_cast<int>((0.6 + 0.6 + 0.4 + 0.2 + 0.1) / 0.2));
+
+}
+
+BOOST_AUTO_TEST_CASE(GeneratorTest)
+{
+ double interval = 10;
+ double percent = 0.5;
+ int times = 10000;
+ IntervalGeneratorPtr generator(new RandomIntervalGenerator(interval, percent));
+ double sum = 0.0;
+ double min = 2 * interval;
+ double max = -1;
+ for (int i = 0; i < times; i++)
+ {
+ double next = generator->nextInterval();
+ sum += next;
+ if (next > max)
+ {
+ max = next;
+ }
+ if (next < min)
+ {
+ min = next;
+ }
+ }
+
+ BOOST_CHECK( abs(1.0 - (sum / static_cast<double>(times)) / interval) < 0.05);
+ BOOST_CHECK( min > interval * (1 - percent / 2.0));
+ BOOST_CHECK( max < interval * (1 + percent / 2.0));
+ BOOST_CHECK( abs(1.0 - ((max - min) / interval) / percent) < 0.05);
+
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/wscript b/wscript
index 43d8d8f..5d38675 100644
--- a/wscript
+++ b/wscript
@@ -19,6 +19,8 @@
conf.define ("CHRONOSHARE_VERSION", VERSION)
conf.check_cfg(package='sqlite3', args=['--cflags', '--libs'], uselib_store='SQLITE3', mandatory=True)
+ conf.check_cfg(package='libevent', args=['--cflags', '--libs'], uselib_store='LIBEVENT', mandatory=True)
+ conf.check_cfg(package='libevent_pthreads', args=['--cflags', '--libs'], uselib_store='LIBEVENT_PTHREADS', mandatory=True)
if not conf.check_cfg(package='openssl', args=['--cflags', '--libs'], uselib_store='SSL', mandatory=False):
libcrypto = conf.check_cc(lib='crypto',
@@ -79,8 +81,9 @@
'src/object-manager.cc',
'src/ccnx-name.cpp',
'src/ccnx-selectors.cpp',
+ 'src/event-scheduler.cpp',
],
- use = 'BOOST BOOST_THREAD BOOST_FILESYSTEM SSL SQLITE3 CCNX common',
+ use = 'BOOST BOOST_THREAD BOOST_FILESYSTEM SSL SQLITE3 CCNX common LIBEVENT LIBEVENT_PTHREADS',
includes = ['include', 'src'],
)