blob: e4f93e4fb3a5f536436e8199891ad09c14469c4b [file] [log] [blame]
Alexander Afanasyev1b0e0082013-01-17 16:48:26 -08001/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013 University of California, Los Angeles
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License version 2 as
7 * published by the Free Software Foundation;
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 *
18 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
19 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
20 */
21
22#include "scheduler.h"
23#include <utility>
24
25using namespace std;
26
27#define EVLOOP_NO_EXIT_ON_EMPTY 0x04
28
29// IntervalGeneratorPtr
30// IntervalGenerator:: Null;
31
32void errorCallback(int err)
33{
34 cout << "Fatal error: " << err << endl;
35}
36
37Scheduler::Scheduler()
38 : m_running(false)
39{
40 event_set_fatal_callback(errorCallback);
41 evthread_use_pthreads();
42 m_base = event_base_new();
43}
44
45Scheduler::~Scheduler()
46{
47 event_base_free(m_base);
48}
49
50void
51Scheduler::eventLoop()
52{
53 while(true)
54 {
55 if (event_base_loop(m_base, EVLOOP_NO_EXIT_ON_EMPTY) < 0)
56 {
57 cout << "scheduler loop break error" << endl;
58 }
59 ReadLock lock(m_mutex);
60 if (!m_running)
61 {
62 cout << "scheduler loop break normal" << endl;
63 break;
64 }
65 }
66}
67
68void
69Scheduler::start()
70{
71 WriteLock lock(m_mutex);
72 if (!m_running)
73 {
74 m_thread = boost::thread(&Scheduler::eventLoop, this);
75 m_running = true;
76 }
77}
78
79void
80Scheduler::shutdown()
81{
82 WriteLock lock(m_mutex);
83 if (m_running)
84 {
85 event_base_loopbreak(m_base);
86 m_thread.join();
87 m_running = false;
88 }
89}
90
91bool
92Scheduler::addTask(const TaskPtr &task)
93{
94 TaskPtr newTask = task;
95
96 if (addToMap(newTask))
97 {
98 newTask->reset();
99 int res = evtimer_add(newTask->ev(), newTask->tv());
100 if (res < 0)
101 {
102 cout << "evtimer_add failed for " << task->tag() << endl;
103 }
104 return true;
105 }
106 else
107 {
108 cout << "fail to add task: " << task->tag() << endl;
109 }
110
111 return false;
112}
113
114void
115Scheduler::rescheduleTask(const Task::Tag &tag)
116{
117 ReadLock lock(m_mutex);
118 TaskMapIt it = m_taskMap.find(tag);
119 if (it != m_taskMap.end())
120 {
121 TaskPtr task = it->second;
122 task->reset();
123 int res = evtimer_add(task->ev(), task->tv());
124 if (res < 0)
125 {
126 cout << "evtimer_add failed for " << task->tag() << endl;
127 }
128 }
129}
130
131bool
132Scheduler::addToMap(const TaskPtr &task)
133{
134 WriteLock lock(m_mutex);
135 if (m_taskMap.find(task->tag()) == m_taskMap.end())
136 {
137 m_taskMap.insert(make_pair(task->tag(), task));
138 return true;
139 }
140 return false;
141}
142
143void
144Scheduler::deleteTask(const Task::Tag &tag)
145{
146 WriteLock lock(m_mutex);
147 TaskMapIt it = m_taskMap.find(tag);
148 if (it != m_taskMap.end())
149 {
150 TaskPtr task = it->second;
151 evtimer_del(task->ev());
152 m_taskMap.erase(it);
153 }
154}
155
156void
157Scheduler::deleteTask(const Task::TaskMatcher &matcher)
158{
159 WriteLock lock(m_mutex);
160 TaskMapIt it = m_taskMap.begin();
161 while(it != m_taskMap.end())
162 {
163 TaskPtr task = it->second;
164 if (matcher(task))
165 {
166 evtimer_del(task->ev());
167 // Use post increment; map.erase invalidate the iterator that is beening erased,
168 // but does not invalidate other iterators. This seems to be the convention to
169 // erase something from C++ STL map while traversing.
170 m_taskMap.erase(it++);
171 }
172 else
173 {
174 ++it;
175 }
176 }
177}
178
179int
180Scheduler::size()
181{
182 ReadLock lock(m_mutex);
183 return m_taskMap.size();
184}