Run ccnx interest and data callback in executor
Run scheduler task in executor
diff --git a/ccnx/ccnx-wrapper.cpp b/ccnx/ccnx-wrapper.cpp
index d584be4..43c9780 100644
--- a/ccnx/ccnx-wrapper.cpp
+++ b/ccnx/ccnx-wrapper.cpp
@@ -47,9 +47,11 @@
: m_handle (0)
, m_running (true)
, m_connected (false)
+ , m_executor (new Executor(1))
{
connectCcnd();
m_thread = thread (&CcnxWrapper::ccnLoop, this);
+ m_executor->start();
}
void
@@ -85,6 +87,8 @@
m_running = false;
}
+ m_executor->shutdown();
+
m_thread.join ();
ccn_disconnect (m_handle);
//ccn_destroy (&m_handle);
@@ -246,13 +250,13 @@
ccn_upcall_kind kind,
ccn_upcall_info *info)
{
- CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+ ExecutorInterestClosure *closure = static_cast<ExecutorInterestClosure*> (selfp->data);
_LOG_TRACE (">> incomingInterest upcall");
switch (kind)
{
case CCN_UPCALL_FINAL: // effective in unit tests
- delete f;
+ delete closure;
delete selfp;
_LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
return CCN_UPCALL_RESULT_OK;
@@ -267,7 +271,8 @@
Name interest(info->interest_ccnb, info->interest_comps);
- (*f) (interest);
+ // this will be run in executor
+ closure->runInterestCallback(interest);
_LOG_TRACE ("<< incomingInterest");
return CCN_UPCALL_RESULT_OK;
@@ -298,6 +303,7 @@
{
_LOG_TRACE ("<< incomingData timeout");
Name interest(info->interest_ccnb, info->interest_comps);
+ // We can not run timeout callback in executor, because we need the return value
Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
switch(rv)
{
@@ -333,6 +339,7 @@
// // otherwise the pointed memory may have been changed during the processing
// readRaw(content, pcontent, len);
+ // this will be run in executor
cp->runDataCallback (pco->name (), pco);
_LOG_TRACE (">> incomingData");
@@ -356,7 +363,7 @@
ccn_charbuf *pname = namePtr->getBuf();
ccn_closure *dataClosure = new ccn_closure;
- Closure *myClosure = closure.dup();
+ Closure *myClosure = new ExecutorClosure(closure, m_executor);
dataClosure->data = (void *)myClosure;
dataClosure->p = &incomingData;
@@ -384,6 +391,7 @@
_LOG_TRACE (">> setInterestFilter");
UniqueRecLock lock(m_mutex);
if (!m_running || !m_connected)
+ {
return -1;
}
@@ -391,10 +399,9 @@
ccn_charbuf *pname = ptr->getBuf();
ccn_closure *interestClosure = new ccn_closure;
- interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+ interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
interestClosure->p = &incomingInterest;
- UniqueRecLock lock(m_mutex);
int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
if (ret < 0)
{