Run ccnx interest and data callback in executor
Run scheduler task in executor
diff --git a/ccnx/ccnx-closure.cpp b/ccnx/ccnx-closure.cpp
index 312af6d..5abcb3e 100644
--- a/ccnx/ccnx-closure.cpp
+++ b/ccnx/ccnx-closure.cpp
@@ -34,7 +34,7 @@
}
Closure::TimeoutCallbackReturnValue
-Closure::runTimeoutCallback(const Name &interest)
+Closure::runTimeoutCallback(Name interest)
{
if (m_timeoutCallback.empty ())
{
@@ -46,7 +46,7 @@
void
-Closure::runDataCallback(const Name &name, PcoPtr content)
+Closure::runDataCallback(Name name, PcoPtr content)
{
if (!m_dataCallback.empty ())
{
@@ -54,10 +54,47 @@
}
}
-Closure *
-Closure::dup () const
+ExecutorClosure::ExecutorClosure(const Closure &closure, ExecutorPtr executor)
+ : Closure(closure)
+ , m_executor(executor)
{
- return new Closure (*this);
+}
+
+ExecutorClosure::~ExecutorClosure()
+{
+}
+
+void
+ExecutorClosure::runDataCallback(Name name, PcoPtr content)
+{
+ m_executor->execute(boost::bind(&ExecutorClosure::execute, this, name, content));
+}
+
+void
+ExecutorClosure::execute(Name name, PcoPtr content)
+{
+ Closure::runDataCallback(name, content);
+}
+
+ExecutorInterestClosure::ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor)
+ : m_callback(callback)
+ , m_executor(executor)
+{
+}
+
+void
+ExecutorInterestClosure::runInterestCallback(Name interest)
+{
+ m_executor->execute(boost::bind(&ExecutorInterestClosure::execute, this, interest));
+}
+
+void
+ExecutorInterestClosure::execute(Name interest)
+{
+ if (!m_callback.empty())
+ {
+ m_callback(interest);
+ }
}
} // Ccnx
diff --git a/ccnx/ccnx-closure.h b/ccnx/ccnx-closure.h
index 343d8be..8b80aa8 100644
--- a/ccnx/ccnx-closure.h
+++ b/ccnx/ccnx-closure.h
@@ -24,6 +24,7 @@
#include "ccnx-common.h"
#include "ccnx-name.h"
+#include "executor.h"
namespace Ccnx {
@@ -33,7 +34,7 @@
class Closure
{
public:
- typedef boost::function<void (const Name &, PcoPtr pco)> DataCallback;
+ typedef boost::function<void (Name, PcoPtr pco)> DataCallback;
typedef enum
{
@@ -41,25 +42,57 @@
RESULT_REEXPRESS
} TimeoutCallbackReturnValue;
- typedef boost::function<TimeoutCallbackReturnValue (const Name &)> TimeoutCallback;
+ typedef boost::function<TimeoutCallbackReturnValue (Name )> TimeoutCallback;
Closure(const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback = TimeoutCallback());
virtual ~Closure();
virtual void
- runDataCallback(const Name &name, Ccnx::PcoPtr pco);
+ runDataCallback(Name name, Ccnx::PcoPtr pco);
virtual TimeoutCallbackReturnValue
- runTimeoutCallback(const Name &interest);
-
- virtual Closure *
- dup() const;
+ runTimeoutCallback(Name interest);
protected:
TimeoutCallback m_timeoutCallback;
DataCallback m_dataCallback;
};
+class ExecutorClosure : public Closure
+{
+public:
+ ExecutorClosure(const Closure &closure, ExecutorPtr executor);
+ virtual ~ExecutorClosure();
+
+ virtual void
+ runDataCallback(Name name, PcoPtr pco);
+
+private:
+ void
+ execute(Name nae, PcoPtr content);
+
+private:
+ ExecutorPtr m_executor;
+};
+
+class ExecutorInterestClosure
+{
+public:
+ typedef boost::function<void (Name)> InterestCallback;
+ ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor);
+ ~ExecutorInterestClosure() {}
+
+ void
+ runInterestCallback(Name interest);
+
+ void
+ execute(Name interest);
+
+private:
+ InterestCallback m_callback;
+ ExecutorPtr m_executor;
+};
+
} // Ccnx
#endif
diff --git a/ccnx/ccnx-wrapper.cpp b/ccnx/ccnx-wrapper.cpp
index d584be4..43c9780 100644
--- a/ccnx/ccnx-wrapper.cpp
+++ b/ccnx/ccnx-wrapper.cpp
@@ -47,9 +47,11 @@
: m_handle (0)
, m_running (true)
, m_connected (false)
+ , m_executor (new Executor(1))
{
connectCcnd();
m_thread = thread (&CcnxWrapper::ccnLoop, this);
+ m_executor->start();
}
void
@@ -85,6 +87,8 @@
m_running = false;
}
+ m_executor->shutdown();
+
m_thread.join ();
ccn_disconnect (m_handle);
//ccn_destroy (&m_handle);
@@ -246,13 +250,13 @@
ccn_upcall_kind kind,
ccn_upcall_info *info)
{
- CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+ ExecutorInterestClosure *closure = static_cast<ExecutorInterestClosure*> (selfp->data);
_LOG_TRACE (">> incomingInterest upcall");
switch (kind)
{
case CCN_UPCALL_FINAL: // effective in unit tests
- delete f;
+ delete closure;
delete selfp;
_LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
return CCN_UPCALL_RESULT_OK;
@@ -267,7 +271,8 @@
Name interest(info->interest_ccnb, info->interest_comps);
- (*f) (interest);
+ // this will be run in executor
+ closure->runInterestCallback(interest);
_LOG_TRACE ("<< incomingInterest");
return CCN_UPCALL_RESULT_OK;
@@ -298,6 +303,7 @@
{
_LOG_TRACE ("<< incomingData timeout");
Name interest(info->interest_ccnb, info->interest_comps);
+ // We can not run timeout callback in executor, because we need the return value
Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
switch(rv)
{
@@ -333,6 +339,7 @@
// // otherwise the pointed memory may have been changed during the processing
// readRaw(content, pcontent, len);
+ // this will be run in executor
cp->runDataCallback (pco->name (), pco);
_LOG_TRACE (">> incomingData");
@@ -356,7 +363,7 @@
ccn_charbuf *pname = namePtr->getBuf();
ccn_closure *dataClosure = new ccn_closure;
- Closure *myClosure = closure.dup();
+ Closure *myClosure = new ExecutorClosure(closure, m_executor);
dataClosure->data = (void *)myClosure;
dataClosure->p = &incomingData;
@@ -384,6 +391,7 @@
_LOG_TRACE (">> setInterestFilter");
UniqueRecLock lock(m_mutex);
if (!m_running || !m_connected)
+ {
return -1;
}
@@ -391,10 +399,9 @@
ccn_charbuf *pname = ptr->getBuf();
ccn_closure *interestClosure = new ccn_closure;
- interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+ interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
interestClosure->p = &incomingInterest;
- UniqueRecLock lock(m_mutex);
int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
if (ret < 0)
{
diff --git a/ccnx/ccnx-wrapper.h b/ccnx/ccnx-wrapper.h
index bdfdd2c..91e28c5 100644
--- a/ccnx/ccnx-wrapper.h
+++ b/ccnx/ccnx-wrapper.h
@@ -31,6 +31,7 @@
#include "ccnx-selectors.h"
#include "ccnx-closure.h"
#include "ccnx-pco.h"
+#include "executor.h"
namespace Ccnx {
@@ -97,6 +98,7 @@
bool m_running;
bool m_connected;
std::map<Name, InterestCallback> m_registeredInterests;
+ ExecutorPtr m_executor;
};
typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;