blob: 87b5760c3b6c75a6cc6a3678f28e15682e1d8dbf [file] [log] [blame]
Jeff Thompsonfa306642013-06-17 15:06:57 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Alexander Afanasyev
5 * Zhenkai Zhu
6 *
7 * BSD license, See the LICENSE file for more information
8 *
9 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
10 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
11 */
12
13#include "executor.h"
14#include "logging.h"
15
16INIT_LOGGER("Executor")
17
18using namespace std;
19using namespace boost;
20
21Executor::Executor (int poolSize)
22 : m_needStop (true)
23 , m_poolSize (poolSize)
24{
25}
26
27Executor::~Executor()
28{
29 _LOG_DEBUG ("Enter destructor");
30 shutdown ();
31 _LOG_DEBUG ("Exit destructor");
32}
33
34void
35Executor::start ()
36{
37 if (m_needStop)
38 {
39 m_needStop = false;
40 for (int i = 0; i < m_poolSize; i++)
41 {
42 m_group.create_thread (bind(&Executor::run, this));
43 }
44 }
45}
46
47void
48Executor::shutdown ()
49{
50 if (!m_needStop)
51 {
52 m_needStop = true;
53 _LOG_DEBUG ("Iterrupting all");
54 m_group.interrupt_all ();
55 _LOG_DEBUG ("Join all");
56 m_group.join_all ();
57 }
58}
59
60
61void
62Executor::execute(const Job &job)
63{
64 _LOG_DEBUG ("Add to job queue");
65
66 Lock lock(m_mutex);
67 bool queueWasEmpty = m_queue.empty ();
68 m_queue.push_back(job);
69
70 // notify working threads if the queue was empty
71 if (queueWasEmpty)
72 {
73 m_cond.notify_one ();
74 }
75}
76
77int
78Executor::poolSize()
79{
80 return m_group.size();
81}
82
83int
84Executor::jobQueueSize()
85{
86 Lock lock(m_mutex);
87 return m_queue.size();
88}
89
90void
91Executor::run ()
92{
93 _LOG_DEBUG ("Start thread");
94
95 while(!m_needStop)
96 {
97 Job job = waitForJob();
98
99 _LOG_DEBUG (">>> enter job");
100 job (); // even if job is "null", nothing bad will happen
101 _LOG_DEBUG ("<<< exit job");
102 }
103
104 _LOG_DEBUG ("Executor thread finished");
105}
106
107Executor::Job
108Executor::waitForJob()
109{
110 Lock lock(m_mutex);
111
112 // wait until job queue is not empty
113 while (m_queue.empty())
114 {
115 _LOG_DEBUG ("Unlocking mutex for wait");
116 m_cond.wait(lock);
117 _LOG_DEBUG ("Re-locking mutex after wait");
118 }
119
120 _LOG_DEBUG ("Got signal on condition");
121
122 Job job;
123 if (!m_queue.empty ()) // this is not always guaranteed, especially after interruption from destructor
124 {
125 job = m_queue.front();
126 m_queue.pop_front();
127 }
128 return job;
129}