Small corrections in executor
diff --git a/src/executor.cc b/src/executor.cc
index 15d54d1..67dd74a 100644
--- a/src/executor.cc
+++ b/src/executor.cc
@@ -20,34 +20,44 @@
*/
#include "executor.h"
+#include "logging.h"
+
+INIT_LOGGER ("Executor");
using namespace std;
using namespace boost;
-Executor::Executor(int poolSize)
+Executor::Executor (int poolSize)
+ : m_needStop (false)
{
for (int i = 0; i < poolSize; i++)
{
- m_group.create_thread(bind(&Executor::run, this));
+ m_group.create_thread (bind(&Executor::run, this));
}
}
Executor::~Executor()
{
- m_group.interrupt_all();
+ _LOG_DEBUG ("Enter destructor");
+ m_needStop = true;
+ m_group.interrupt_all ();
+ m_group.join_all ();
+ _LOG_DEBUG ("Exit destructor");
}
void
Executor::execute(const Job &job)
{
+ _LOG_DEBUG ("Add to job queue");
+
Lock lock(m_mutex);
- bool queueWasEmpty = m_queue.empty();
+ bool queueWasEmpty = m_queue.empty ();
m_queue.push_back(job);
// notify working threads if the queue was empty
if (queueWasEmpty)
{
- m_cond.notify_one();
+ m_cond.notify_one ();
}
}
@@ -65,14 +75,18 @@
}
void
-Executor::run()
+Executor::run ()
{
- while(true)
+ _LOG_DEBUG ("Start thread");
+
+ while(!m_needStop)
{
Job job = waitForJob();
- job();
+ job (); // even if job is "null", nothing bad will happen
}
+
+ _LOG_DEBUG ("Thread finished");
}
Executor::Job
@@ -86,7 +100,13 @@
m_cond.wait(lock);
}
- Job job = m_queue.front();
- m_queue.pop_front();
+ _LOG_DEBUG ("Got signal on condition");
+
+ Job job;
+ if (!m_queue.empty ()) // this is not always guaranteed, especially after interruption from destructor
+ {
+ job = m_queue.front();
+ m_queue.pop_front();
+ }
return job;
}
diff --git a/src/executor.h b/src/executor.h
index bda46b4..e15c174 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -29,7 +29,7 @@
#include <boost/thread/thread.hpp>
#include <deque>
-/* A very simple executor to execute submitted tasks immediately or
+/* A very simple executor to execute submitted tasks immediately or
* in the future (depending on whether there is idle thread)
* A fixed number of threads are created for executing tasks;
* The policy is FIFO
@@ -73,5 +73,7 @@
Mutex m_mutex;
Cond m_cond;
ThreadGroup m_group;
+
+ volatile bool m_needStop;
};
#endif // EXECUTOR_H
diff --git a/src/object-db.cc b/src/object-db.cc
index 6120dc0..fe00225 100644
--- a/src/object-db.cc
+++ b/src/object-db.cc
@@ -132,9 +132,9 @@
//_LOG_DEBUG ("Saving content object for [" << deviceName << ", seqno: " << segment << ", size: " << data.size () << "]");
CcnxCharbufPtr buf = deviceName.toCcnxCharbuf ();
- sqlite3_bind_blob (stmt, 1, buf->buf (), buf->length (), SQLITE_TRANSIENT);
+ sqlite3_bind_blob (stmt, 1, buf->buf (), buf->length (), SQLITE_STATIC);
sqlite3_bind_int64 (stmt, 2, segment);
- sqlite3_bind_blob (stmt, 3, &data[0], data.size (), SQLITE_TRANSIENT);
+ sqlite3_bind_blob (stmt, 3, &data[0], data.size (), SQLITE_STATIC);
sqlite3_step (stmt);
sqlite3_finalize (stmt);
diff --git a/test/test-executor.cc b/test/test-executor.cc
index 5236234..9bdd43a 100644
--- a/test/test-executor.cc
+++ b/test/test-executor.cc
@@ -23,49 +23,57 @@
#include <boost/test/unit_test.hpp>
#include "executor.h"
+#include "logging.h"
+
+INIT_LOGGER ("Test.Executor");
+
using namespace boost;
using namespace std;
-void timeConsumingJob()
+void timeConsumingJob ()
{
- cout << "Start sleep" << endl;
+ _LOG_DEBUG ("Start sleep");
sleep(1);
- cout << "finish sleep" << endl;
+ _LOG_DEBUG ("Finish sleep");
}
-BOOST_AUTO_TEST_CASE(ExecutorTest)
+BOOST_AUTO_TEST_CASE(TestExecutor)
{
- Executor executor(3);
- Executor::Job job = bind(timeConsumingJob);
+ INIT_LOGGERS ();
- executor.execute(job);
- executor.execute(job);
+ {
+ Executor executor (3);
+ Executor::Job job = bind(timeConsumingJob);
- usleep(100);
- // both jobs should have been taken care of
- BOOST_CHECK_EQUAL(executor.jobQueueSize(), 0);
+ executor.execute(job);
+ executor.execute(job);
- usleep(500000);
+ usleep(1000);
+ // both jobs should have been taken care of
+ BOOST_CHECK_EQUAL(executor.jobQueueSize(), 0);
- // add four jobs while only one thread is idle
- executor.execute(job);
- executor.execute(job);
- executor.execute(job);
- executor.execute(job);
+ usleep(500000);
- usleep(100);
- // three jobs should remain in queue
- BOOST_CHECK_EQUAL(executor.jobQueueSize(), 3);
+ // add four jobs while only one thread is idle
+ executor.execute(job);
+ executor.execute(job);
+ executor.execute(job);
+ executor.execute(job);
- usleep(500000);
- // two threads should have finished and
- // take care of two queued jobs
- BOOST_CHECK_EQUAL(executor.jobQueueSize(), 1);
+ usleep(1000);
+ // three jobs should remain in queue
+ BOOST_CHECK_EQUAL(executor.jobQueueSize(), 3);
- // all jobs should have been fetched
- usleep(500100);
- BOOST_CHECK_EQUAL(executor.jobQueueSize(), 0);
+ usleep(500000);
+ // two threads should have finished and
+ // take care of two queued jobs
+ BOOST_CHECK_EQUAL(executor.jobQueueSize(), 1);
+
+ // all jobs should have been fetched
+ usleep(501000);
+ BOOST_CHECK_EQUAL(executor.jobQueueSize(), 0);
+ } //separate scope to ensure that destructor is called
+
sleep(1);
-
}