Merge remote-tracking branch 'origin/master'

Conflicts:
	wscript
diff --git a/include/event-scheduler.h b/include/event-scheduler.h
index 72169d8..b2aa929 100644
--- a/include/event-scheduler.h
+++ b/include/event-scheduler.h
@@ -1,7 +1,9 @@
 #ifndef EVENT_SCHEDULER_H
 #define EVENT_SCHEDULER_H
+
+// use pthread
+
 #include <event2/event.h>
-#include <event2/event_struct.h>
 #include <event2/thread.h>
 
 #include <boost/function.hpp>
@@ -10,8 +12,11 @@
 #include <boost/random/uniform_real.hpp>
 #include <boost/random/variate_generator.hpp>
 #include <boost/exception/all.hpp>
+#include <boost/thread/shared_mutex.hpp>
+#include <boost/thread/thread.hpp>
 #include <math.h>
-#include <multimap>
+#include <map>
+#include <sys/time.h>
 
 #define _OVERRIDE
 #ifdef __GNUC__
@@ -23,36 +28,72 @@
 
 using namespace std;
 
+static void
+eventCallback(evutil_socket_t fd, short what, void *arg);
+
+class Scheduler;
+typedef boost::shared_ptr<Scheduler> SchedulerPtr;
+class IntervalGenerator;
+typedef boost::shared_ptr<IntervalGenerator> IntervalGeneratorPtr;
+class Task;
+typedef boost::shared_ptr<Task> TaskPtr;
+
+class IntervalGenerator
+{
+public:
+  virtual double
+  nextInterval() = 0;
+  static IntervalGeneratorPtr Null;
+};
 
 class Task
 {
 public:
-  typedef boost::function<void (void *)> Callback;
+  typedef boost::function<void ()> Callback;
   typedef string Tag;
-  typedef boost::function<bool (const Task &task)> TaskMatcher;
+  typedef boost::function<bool (const TaskPtr &task)> TaskMatcher;
 
-  Task(const Callback &callback, void *arg, const Tag &tag);
-  Task(const Task &other);
-  Task &
-  operator=(const Task &other);
+  Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator = IntervalGenerator::Null);
+  ~Task();
 
   virtual void
-  run() { m_callback(m_arg); }
+  run();
 
   Tag
   tag() { return m_tag; }
 
+  event *
+  ev() { return m_event; }
+
+  timeval *
+  tv() { return m_tv; }
+
+  void
+  setTv(double delay);
+
+  bool
+  isPeriodic() { return m_generator != IntervalGenerator::Null; }
+
+  void
+  reset();
+
+protected:
+  void
+  selfClean();
 
 protected:
   Callback m_callback;
   Tag m_tag;
-  void *arg;
+  SchedulerPtr m_scheduler;
+  bool m_invoked;
+  event *m_event;
+  timeval *m_tv;
+  IntervalGeneratorPtr m_generator;
 };
 
+
 struct SchedulerException : virtual boost::exception, virtual exception { };
 
-class IntervalGenerator;
-typedef boost::shared_ptr<IntervalGenerator> IntervalGeneratorPtr;
 class Scheduler
 {
 public:
@@ -65,34 +106,48 @@
   virtual void
   shutdown();
 
-  virtual void
-  addTask(const Task &task, double delay);
+  virtual bool
+  addTask(const TaskPtr &task, double delay);
+
+  virtual bool
+  addTask(const TaskPtr &task);
 
   virtual void
-  addTimeoutTask(const Task &task, double timeout);
+  deleteTask(const Task::Tag &tag);
 
   virtual void
-  addPeriodicTask(const Task &task, const IntervalGeneratorPtr &generator);
+  deleteTask(const Task::TaskMatcher &matcher);
 
   virtual void
-  deleteTask(const Tag &tag);
+  rescheduleTask(const Task::Tag &tag);
 
-  virtual void
-  deleteTask(const TaskMatcher &matcher);
+  void
+  eventLoop();
+
+  event_base *
+  base() { return m_base; }
+
+  // used in test
+  int
+  size();
 
 protected:
-  typedef multimap<Tag, Task> TaskMap;
+  bool
+  addToMap(const TaskPtr &task);
+
+protected:
+  typedef map<Task::Tag, TaskPtr> TaskMap;
+  typedef map<Task::Tag, TaskPtr>::iterator TaskMapIt;
+  typedef boost::shared_mutex Mutex;
+  typedef boost::unique_lock<Mutex> WriteLock;
+  typedef boost::shared_lock<Mutex> ReadLock;
   TaskMap m_taskMap;
+  Mutex m_mutex;
+  event_base *m_base;
+  boost::thread m_thread;
 };
 
-class IntervalGenerator
-{
-public:
-  virtual double
-  nextInterval() = 0;
-};
-
-class SimpleIntervalGenerator : IntervalGenerator
+class SimpleIntervalGenerator : public IntervalGenerator
 {
 public:
   SimpleIntervalGenerator(double interval) : m_interval(interval) {}
@@ -100,18 +155,17 @@
   virtual double
   nextInterval() _OVERRIDE { return m_interval; }
 private:
-  SimpleIntervalGenerator(const SimpleIntervalGenerator &other){};
-private:
   double m_interval;
 };
 
-class RandomIntervalGenerator : IntervalGenerator
+class RandomIntervalGenerator : public IntervalGenerator
 {
+public:
   typedef enum
   {
-    UP,
-    DOWN,
-    EVEN
+    UP = 1,
+    DOWN = 2,
+    EVEN = 3
   } Direction;
 
 public:
@@ -121,14 +175,13 @@
   nextInterval() _OVERRIDE;
 
 private:
-  RandomIntervalGenerator(const RandomIntervalGenerator &other){};
   inline double fractional(double x) { double dummy; return abs(modf(x, &dummy)); }
 
 private:
   typedef boost::mt19937 RNG_TYPE;
   RNG_TYPE m_rng;
   boost::uniform_real<> m_dist;
-  boost::rariate_generator<RNG_TYPE &, boost::uniform_real<> > m_random;
+  boost::variate_generator<RNG_TYPE &, boost::uniform_real<> > m_random;
   Direction m_direction;
   double m_interval;
   double m_percent;
diff --git a/src/event-scheduler.cpp b/src/event-scheduler.cpp
index b823255..77b86d2 100644
--- a/src/event-scheduler.cpp
+++ b/src/event-scheduler.cpp
@@ -1,34 +1,99 @@
 #include "event-scheduler.h"
+#include <utility>
 
-Task::Task(const Callback &callback, void *arg, const Tag &tag)
+#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
+
+IntervalGeneratorPtr
+IntervalGenerator:: Null;
+
+void
+eventCallback(evutil_socket_t fd, short what, void *arg)
+{
+  Task *task = static_cast<Task *>(arg);
+  task->run();
+  task = NULL;
+}
+
+Task::Task(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
      : m_callback(callback)
-     , m_arg(arg)
      , m_tag(tag)
+     , m_scheduler(scheduler)
+     , m_invoked(false)
+     , m_event(NULL)
+     , m_tv(NULL)
+     , m_generator(generator)
 {
+  m_event = evtimer_new(scheduler->base(), eventCallback, this);
+  m_tv = new timeval;
 }
 
-Task::Task(const Task &other)
-     : m_callback(other.m_callback)
-     , m_arg(other.m_arg)
-     , m_tag(other.m_tag)
+Task::~Task()
 {
+  if (m_event != NULL)
+  {
+    event_free(m_event);
+    m_event = NULL;
+  }
+
+  if (m_tv != NULL)
+  {
+    delete m_tv;
+    m_tv = NULL;
+  }
 }
 
-Task &
-Task::operator=(const Task &other)
+void
+Task::run()
 {
-  m_callback = other.m_callback;
-  m_arg = other.m_arg;
-  m_tag = other.m_tag;
-  return (*this);
+  if (!m_invoked)
+  {
+    m_callback();
+    m_invoked = true;
+  }
+  if (isPeriodic())
+  {
+    reset();
+    m_scheduler->rescheduleTask(m_tag);
+  }
+  else
+  {
+    selfClean();
+  }
 }
 
-RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction = UP)
+void
+Task::selfClean()
+{
+  m_scheduler->deleteTask(m_tag);
+}
+
+void
+Task::reset()
+{
+  m_invoked = false;
+  if (isPeriodic())
+  {
+    double interval = m_generator->nextInterval();
+    setTv(interval);
+  }
+}
+
+void
+Task::setTv(double delay)
+{
+  double intPart, fraction;
+  fraction = modf(abs(delay), &intPart);
+  m_tv->tv_sec = static_cast<int>(intPart);
+  m_tv->tv_usec = static_cast<int>((fraction * 1000000));
+}
+
+RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
                         : m_interval(interval)
                         , m_rng(time(NULL))
                         , m_percent(percent)
                         , m_dist(0.0, fractional(percent))
                         , m_random(m_rng, m_dist)
+                        , m_direction(direction)
 {
 }
 
@@ -42,8 +107,140 @@
   case UP: interval = m_interval * (1.0 + percent); break;
   case DOWN: interval = m_interval * (1.0 - percent); break;
   case EVEN: interval = m_interval * (1.0 - m_percent/2.0 + percent); break;
-  default: break
+  default: break;
   }
 
   return interval;
 }
+
+Scheduler::Scheduler()
+{
+  evthread_use_pthreads();
+  m_base = event_base_new();
+}
+
+Scheduler::~Scheduler()
+{
+  event_base_free(m_base);
+}
+
+void
+Scheduler::eventLoop()
+{
+  event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY);
+}
+
+void
+Scheduler::start()
+{
+  m_thread = boost::thread(&Scheduler::eventLoop, this);
+}
+
+void
+Scheduler::shutdown()
+{
+  event_base_loopbreak(m_base);
+  m_thread.join();
+}
+
+bool
+Scheduler::addTask(const TaskPtr &task, double delay)
+{
+  TaskPtr newTask = task;
+  newTask->setTv(delay);
+  if (addToMap(newTask))
+  {
+    evtimer_add(newTask->ev(), newTask->tv());
+    return true;
+  }
+  return false;
+}
+
+bool
+Scheduler::addTask(const TaskPtr &task)
+{
+  TaskPtr newTask = task;
+  if (!newTask->isPeriodic())
+  {
+    return false;
+  }
+
+  if (addToMap(newTask))
+  {
+    newTask->reset();
+    evtimer_add(newTask->ev(), newTask->tv());
+    return true;
+  }
+
+  return false;
+}
+
+void
+Scheduler::rescheduleTask(const Task::Tag &tag)
+{
+  ReadLock(m_mutex);
+  TaskMapIt it = m_taskMap.find(tag);
+  if (it != m_taskMap.end())
+  {
+    TaskPtr task = it->second;
+    if (task->isPeriodic())
+    {
+      evtimer_add(task->ev(), task->tv());
+    }
+  }
+}
+
+bool
+Scheduler::addToMap(const TaskPtr &task)
+{
+  WriteLock(m_mutex);
+  if (m_taskMap.find(task->tag()) == m_taskMap.end())
+  {
+    m_taskMap.insert(make_pair(task->tag(), task));
+    return true;
+  }
+  return false;
+}
+
+void
+Scheduler::deleteTask(const Task::Tag &tag)
+{
+  WriteLock(m_mutex);
+  TaskMapIt it = m_taskMap.find(tag);
+  if (it != m_taskMap.end())
+  {
+    TaskPtr task = it->second;
+    evtimer_del(task->ev());
+    m_taskMap.erase(it);
+  }
+}
+
+void
+Scheduler::deleteTask(const Task::TaskMatcher &matcher)
+{
+  WriteLock(m_mutex);
+  TaskMapIt it = m_taskMap.begin();
+  while(it != m_taskMap.end())
+  {
+    TaskPtr task = it->second;
+    if (matcher(task))
+    {
+      evtimer_del(task->ev());
+      // Use post increment; map.erase invalidate the iterator that is beening erased,
+      // but does not invalidate other iterators. This seems to be the convention to
+      // erase something from C++ STL map while traversing.
+      m_taskMap.erase(it++);
+    }
+    else
+    {
+      ++it;
+    }
+  }
+}
+
+int
+Scheduler::size()
+{
+  ReadLock(m_mutex);
+  return m_taskMap.size();
+}
diff --git a/test/test-event-scheduler.cc b/test/test-event-scheduler.cc
new file mode 100644
index 0000000..950a69e
--- /dev/null
+++ b/test/test-event-scheduler.cc
@@ -0,0 +1,138 @@
+#include "event-scheduler.h"
+
+#include <boost/test/unit_test.hpp>
+#include <map>
+#include <unistd.h>
+
+using namespace boost;
+using namespace std;
+
+BOOST_AUTO_TEST_SUITE(SchedulerTests)
+
+map<string, int> table;
+
+void func(string str)
+{
+  map<string, int>::iterator it = table.find(str);
+  if (it == table.end())
+  {
+    table.insert(make_pair(str, 1));
+  }
+  else
+  {
+    int count = it->second;
+    count++;
+    table.erase(it);
+    table.insert(make_pair(str, count));
+  }
+}
+
+bool
+matcher(const TaskPtr &task)
+{
+  return task->tag() == "period" || task->tag() == "world";
+}
+
+BOOST_AUTO_TEST_CASE(SchedulerTest)
+{
+  SchedulerPtr scheduler(new Scheduler());
+  IntervalGeneratorPtr generator(new SimpleIntervalGenerator(0.2));
+
+  string tag1 = "hello";
+  string tag2 = "world";
+  string tag3 = "period";
+
+  TaskPtr task1(new Task(boost::bind(func, tag1), tag1, scheduler));
+  TaskPtr task2(new Task(boost::bind(func, tag2), tag2, scheduler));
+  TaskPtr task3(new Task(boost::bind(func, tag3), tag3, scheduler, generator));
+
+  scheduler->start();
+  scheduler->addTask(task1, 0.5);
+  scheduler->addTask(task2, 0.5);
+  scheduler->addTask(task3);
+  BOOST_CHECK_EQUAL(scheduler->size(), 3);
+  usleep(600000);
+  BOOST_CHECK_EQUAL(scheduler->size(), 1);
+  task1->reset();
+  scheduler->addTask(task1, 0.5);
+  BOOST_CHECK_EQUAL(scheduler->size(), 2);
+  usleep(600000);
+  task1->reset();
+  scheduler->addTask(task1, 0.5);
+  BOOST_CHECK_EQUAL(scheduler->size(), 2);
+  usleep(400000);
+  scheduler->deleteTask(task1->tag());
+  BOOST_CHECK_EQUAL(scheduler->size(), 1);
+  usleep(200000);
+
+  task1->reset();
+  task2->reset();
+  scheduler->addTask(task1, 0.5);
+  scheduler->addTask(task2, 0.5);
+  BOOST_CHECK_EQUAL(scheduler->size(), 3);
+  usleep(100000);
+  scheduler->deleteTask(bind(matcher, _1));
+  BOOST_CHECK_EQUAL(scheduler->size(), 1);
+  usleep(1000000);
+
+  scheduler->shutdown();
+
+  int hello = 0, world = 0, period = 0;
+
+  map<string, int>::iterator it;
+  it = table.find(tag1);
+  if (it != table.end())
+  {
+    hello = it->second;
+  }
+  it = table.find(tag2);
+  if (it != table.end())
+  {
+    world = it->second;
+  }
+  it = table.find(tag3);
+  if (it != table.end())
+  {
+    period = it->second;
+  }
+
+  // added four times, canceled once before invoking callback
+  BOOST_CHECK_EQUAL(hello, 3);
+  // added two times, canceled once by matcher before invoking callback
+  BOOST_CHECK_EQUAL(world, 1);
+  // invoked every 0.2 seconds before deleted by matcher
+  BOOST_CHECK_EQUAL(period, static_cast<int>((0.6 + 0.6 + 0.4 + 0.2 + 0.1) / 0.2));
+
+}
+
+BOOST_AUTO_TEST_CASE(GeneratorTest)
+{
+  double interval = 10;
+  double percent = 0.5;
+  int times = 10000;
+  IntervalGeneratorPtr generator(new RandomIntervalGenerator(interval, percent));
+  double sum = 0.0;
+  double min = 2 * interval;
+  double max = -1;
+  for (int i = 0; i < times; i++)
+  {
+    double next = generator->nextInterval();
+    sum += next;
+    if (next > max)
+    {
+      max = next;
+    }
+    if (next < min)
+    {
+      min = next;
+    }
+  }
+
+  BOOST_CHECK( abs(1.0 - (sum / static_cast<double>(times)) / interval) < 0.05);
+  BOOST_CHECK( min > interval * (1 - percent / 2.0));
+  BOOST_CHECK( max < interval * (1 + percent / 2.0));
+  BOOST_CHECK( abs(1.0 - ((max - min) / interval) / percent) < 0.05);
+
+}
+
+BOOST_AUTO_TEST_SUITE_END()
diff --git a/wscript b/wscript
index 43d8d8f..5d38675 100644
--- a/wscript
+++ b/wscript
@@ -19,6 +19,8 @@
     conf.define ("CHRONOSHARE_VERSION", VERSION)
 
     conf.check_cfg(package='sqlite3', args=['--cflags', '--libs'], uselib_store='SQLITE3', mandatory=True)
+    conf.check_cfg(package='libevent', args=['--cflags', '--libs'], uselib_store='LIBEVENT', mandatory=True)
+    conf.check_cfg(package='libevent_pthreads', args=['--cflags', '--libs'], uselib_store='LIBEVENT_PTHREADS', mandatory=True)
 
     if not conf.check_cfg(package='openssl', args=['--cflags', '--libs'], uselib_store='SSL', mandatory=False):
         libcrypto = conf.check_cc(lib='crypto',
@@ -79,8 +81,9 @@
             'src/object-manager.cc',
             'src/ccnx-name.cpp',
             'src/ccnx-selectors.cpp',
+            'src/event-scheduler.cpp',
             ],
-        use = 'BOOST BOOST_THREAD BOOST_FILESYSTEM SSL SQLITE3 CCNX common',
+        use = 'BOOST BOOST_THREAD BOOST_FILESYSTEM SSL SQLITE3 CCNX common LIBEVENT LIBEVENT_PTHREADS',
         includes = ['include', 'src'],
         )