Run ccnx interest and data callback in executor
Run scheduler task in executor
diff --git a/ccnx/ccnx-closure.cpp b/ccnx/ccnx-closure.cpp
index 312af6d..5abcb3e 100644
--- a/ccnx/ccnx-closure.cpp
+++ b/ccnx/ccnx-closure.cpp
@@ -34,7 +34,7 @@
}
Closure::TimeoutCallbackReturnValue
-Closure::runTimeoutCallback(const Name &interest)
+Closure::runTimeoutCallback(Name interest)
{
if (m_timeoutCallback.empty ())
{
@@ -46,7 +46,7 @@
void
-Closure::runDataCallback(const Name &name, PcoPtr content)
+Closure::runDataCallback(Name name, PcoPtr content)
{
if (!m_dataCallback.empty ())
{
@@ -54,10 +54,47 @@
}
}
-Closure *
-Closure::dup () const
+ExecutorClosure::ExecutorClosure(const Closure &closure, ExecutorPtr executor)
+ : Closure(closure)
+ , m_executor(executor)
{
- return new Closure (*this);
+}
+
+ExecutorClosure::~ExecutorClosure()
+{
+}
+
+void
+ExecutorClosure::runDataCallback(Name name, PcoPtr content)
+{
+ m_executor->execute(boost::bind(&ExecutorClosure::execute, this, name, content));
+}
+
+void
+ExecutorClosure::execute(Name name, PcoPtr content)
+{
+ Closure::runDataCallback(name, content);
+}
+
+ExecutorInterestClosure::ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor)
+ : m_callback(callback)
+ , m_executor(executor)
+{
+}
+
+void
+ExecutorInterestClosure::runInterestCallback(Name interest)
+{
+ m_executor->execute(boost::bind(&ExecutorInterestClosure::execute, this, interest));
+}
+
+void
+ExecutorInterestClosure::execute(Name interest)
+{
+ if (!m_callback.empty())
+ {
+ m_callback(interest);
+ }
}
} // Ccnx
diff --git a/ccnx/ccnx-closure.h b/ccnx/ccnx-closure.h
index 343d8be..8b80aa8 100644
--- a/ccnx/ccnx-closure.h
+++ b/ccnx/ccnx-closure.h
@@ -24,6 +24,7 @@
#include "ccnx-common.h"
#include "ccnx-name.h"
+#include "executor.h"
namespace Ccnx {
@@ -33,7 +34,7 @@
class Closure
{
public:
- typedef boost::function<void (const Name &, PcoPtr pco)> DataCallback;
+ typedef boost::function<void (Name, PcoPtr pco)> DataCallback;
typedef enum
{
@@ -41,25 +42,57 @@
RESULT_REEXPRESS
} TimeoutCallbackReturnValue;
- typedef boost::function<TimeoutCallbackReturnValue (const Name &)> TimeoutCallback;
+ typedef boost::function<TimeoutCallbackReturnValue (Name )> TimeoutCallback;
Closure(const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback = TimeoutCallback());
virtual ~Closure();
virtual void
- runDataCallback(const Name &name, Ccnx::PcoPtr pco);
+ runDataCallback(Name name, Ccnx::PcoPtr pco);
virtual TimeoutCallbackReturnValue
- runTimeoutCallback(const Name &interest);
-
- virtual Closure *
- dup() const;
+ runTimeoutCallback(Name interest);
protected:
TimeoutCallback m_timeoutCallback;
DataCallback m_dataCallback;
};
+class ExecutorClosure : public Closure
+{
+public:
+ ExecutorClosure(const Closure &closure, ExecutorPtr executor);
+ virtual ~ExecutorClosure();
+
+ virtual void
+ runDataCallback(Name name, PcoPtr pco);
+
+private:
+ void
+ execute(Name nae, PcoPtr content);
+
+private:
+ ExecutorPtr m_executor;
+};
+
+class ExecutorInterestClosure
+{
+public:
+ typedef boost::function<void (Name)> InterestCallback;
+ ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor);
+ ~ExecutorInterestClosure() {}
+
+ void
+ runInterestCallback(Name interest);
+
+ void
+ execute(Name interest);
+
+private:
+ InterestCallback m_callback;
+ ExecutorPtr m_executor;
+};
+
} // Ccnx
#endif
diff --git a/ccnx/ccnx-wrapper.cpp b/ccnx/ccnx-wrapper.cpp
index d584be4..43c9780 100644
--- a/ccnx/ccnx-wrapper.cpp
+++ b/ccnx/ccnx-wrapper.cpp
@@ -47,9 +47,11 @@
: m_handle (0)
, m_running (true)
, m_connected (false)
+ , m_executor (new Executor(1))
{
connectCcnd();
m_thread = thread (&CcnxWrapper::ccnLoop, this);
+ m_executor->start();
}
void
@@ -85,6 +87,8 @@
m_running = false;
}
+ m_executor->shutdown();
+
m_thread.join ();
ccn_disconnect (m_handle);
//ccn_destroy (&m_handle);
@@ -246,13 +250,13 @@
ccn_upcall_kind kind,
ccn_upcall_info *info)
{
- CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+ ExecutorInterestClosure *closure = static_cast<ExecutorInterestClosure*> (selfp->data);
_LOG_TRACE (">> incomingInterest upcall");
switch (kind)
{
case CCN_UPCALL_FINAL: // effective in unit tests
- delete f;
+ delete closure;
delete selfp;
_LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
return CCN_UPCALL_RESULT_OK;
@@ -267,7 +271,8 @@
Name interest(info->interest_ccnb, info->interest_comps);
- (*f) (interest);
+ // this will be run in executor
+ closure->runInterestCallback(interest);
_LOG_TRACE ("<< incomingInterest");
return CCN_UPCALL_RESULT_OK;
@@ -298,6 +303,7 @@
{
_LOG_TRACE ("<< incomingData timeout");
Name interest(info->interest_ccnb, info->interest_comps);
+ // We can not run timeout callback in executor, because we need the return value
Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
switch(rv)
{
@@ -333,6 +339,7 @@
// // otherwise the pointed memory may have been changed during the processing
// readRaw(content, pcontent, len);
+ // this will be run in executor
cp->runDataCallback (pco->name (), pco);
_LOG_TRACE (">> incomingData");
@@ -356,7 +363,7 @@
ccn_charbuf *pname = namePtr->getBuf();
ccn_closure *dataClosure = new ccn_closure;
- Closure *myClosure = closure.dup();
+ Closure *myClosure = new ExecutorClosure(closure, m_executor);
dataClosure->data = (void *)myClosure;
dataClosure->p = &incomingData;
@@ -384,6 +391,7 @@
_LOG_TRACE (">> setInterestFilter");
UniqueRecLock lock(m_mutex);
if (!m_running || !m_connected)
+ {
return -1;
}
@@ -391,10 +399,9 @@
ccn_charbuf *pname = ptr->getBuf();
ccn_closure *interestClosure = new ccn_closure;
- interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+ interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
interestClosure->p = &incomingInterest;
- UniqueRecLock lock(m_mutex);
int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
if (ret < 0)
{
diff --git a/ccnx/ccnx-wrapper.h b/ccnx/ccnx-wrapper.h
index bdfdd2c..91e28c5 100644
--- a/ccnx/ccnx-wrapper.h
+++ b/ccnx/ccnx-wrapper.h
@@ -31,6 +31,7 @@
#include "ccnx-selectors.h"
#include "ccnx-closure.h"
#include "ccnx-pco.h"
+#include "executor.h"
namespace Ccnx {
@@ -97,6 +98,7 @@
bool m_running;
bool m_connected;
std::map<Name, InterestCallback> m_registeredInterests;
+ ExecutorPtr m_executor;
};
typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;
diff --git a/scheduler/scheduler.cc b/scheduler/scheduler.cc
index 02b6431..1e9aee7 100644
--- a/scheduler/scheduler.cc
+++ b/scheduler/scheduler.cc
@@ -56,6 +56,7 @@
Scheduler::Scheduler()
: m_running(false)
+ , m_executor(1)
{
event_set_fatal_callback(errorCallback);
evthread_use_pthreads();
@@ -113,12 +114,19 @@
}
void
+Scheduler::execute(Executor::Job job)
+{
+ m_executor.execute(job);
+}
+
+void
Scheduler::start()
{
ScopedLock lock(m_mutex);
if (!m_running)
{
m_thread = boost::thread(&Scheduler::eventLoop, this);
+ m_executor.start();
m_running = true;
}
}
@@ -139,6 +147,7 @@
if (breakAndWait)
{
event_base_loopbreak(m_base);
+ m_executor.shutdown();
m_thread.join();
}
}
diff --git a/scheduler/scheduler.h b/scheduler/scheduler.h
index 18bd9bd..096be98 100644
--- a/scheduler/scheduler.h
+++ b/scheduler/scheduler.h
@@ -39,6 +39,7 @@
#include "task.h"
#include "interval-generator.h"
+#include "executor.h"
class Scheduler;
typedef boost::shared_ptr<Scheduler> SchedulerPtr;
@@ -110,6 +111,9 @@
rescheduleTaskAt (TaskPtr task, double time);
void
+ execute(Executor::Job);
+
+ void
eventLoop();
event_base *
@@ -135,6 +139,7 @@
event_base *m_base;
event *m_ev;
boost::thread m_thread;
+ Executor m_executor;
};
struct SchedulerException : virtual boost::exception, virtual std::exception { };
diff --git a/scheduler/task.cc b/scheduler/task.cc
index 0d09199..4702906 100644
--- a/scheduler/task.cc
+++ b/scheduler/task.cc
@@ -26,7 +26,7 @@
eventCallback(evutil_socket_t fd, short what, void *arg)
{
Task *task = static_cast<Task *>(arg);
- task->run();
+ task->execute();
task = NULL;
}
@@ -68,3 +68,9 @@
m_tv->tv_sec = static_cast<int>(intPart);
m_tv->tv_usec = static_cast<int>((fraction * 1000000));
}
+
+void
+Task::execute()
+{
+ m_scheduler->execute(boost::bind(&Task::run, this));
+}
diff --git a/scheduler/task.h b/scheduler/task.h
index 8289c57..41e4438 100644
--- a/scheduler/task.h
+++ b/scheduler/task.h
@@ -86,6 +86,9 @@
void
setTv(double delay);
+ void
+ execute();
+
protected:
Callback m_callback;
Tag m_tag;
diff --git a/src/executor.h b/src/executor.h
index 54a0108..c8e6cb0 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -23,6 +23,7 @@
#define EXECUTOR_H
#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/locks.hpp>
@@ -83,4 +84,6 @@
volatile bool m_needStop;
int m_poolSize;
};
+
+typedef boost::shared_ptr<Executor> ExecutorPtr;
#endif // EXECUTOR_H
diff --git a/wscript b/wscript
index aff8069..e7033f1 100644
--- a/wscript
+++ b/wscript
@@ -72,7 +72,7 @@
features = ["cxx"],
source = bld.path.ant_glob(['scheduler/**/*.cc']),
use = 'BOOST BOOST_THREAD LIBEVENT LIBEVENT_PTHREADS LOG4CXX',
- includes = "scheduler src",
+ includes = "scheduler executor src",
)
libccnx = bld (
@@ -80,7 +80,7 @@
features=['cxx'],
source = bld.path.ant_glob(['ccnx/**/*.cc', 'ccnx/**/*.cpp']),
use = 'BOOST BOOST_THREAD SSL CCNX LOG4CXX scheduler',
- includes = "ccnx src scheduler",
+ includes = "ccnx src scheduler executor",
)
chornoshare = bld (
@@ -88,7 +88,7 @@
features=['cxx'],
source = bld.path.ant_glob(['src/**/*.cc', 'src/**/*.cpp', 'src/**/*.proto']),
use = "BOOST BOOST_FILESYSTEM SQLITE3 LOG4CXX scheduler ccnx",
- includes = "ccnx scheduler src",
+ includes = "ccnx scheduler src executor",
)
# Unit tests
@@ -98,7 +98,7 @@
source = bld.path.ant_glob(['test/*.cc']),
features=['cxx', 'cxxprogram'],
use = 'BOOST_TEST BOOST_FILESYSTEM LOG4CXX ccnx database chronoshare',
- includes = "ccnx scheduler src",
+ includes = "ccnx scheduler src executor",
)
app_plist = '''<?xml version="1.0" encoding="UTF-8"?>