Run ccnx interest and data callback in executor
Run scheduler task in executor
diff --git a/ccnx/ccnx-closure.cpp b/ccnx/ccnx-closure.cpp
index 312af6d..5abcb3e 100644
--- a/ccnx/ccnx-closure.cpp
+++ b/ccnx/ccnx-closure.cpp
@@ -34,7 +34,7 @@
 }
 
 Closure::TimeoutCallbackReturnValue
-Closure::runTimeoutCallback(const Name &interest)
+Closure::runTimeoutCallback(Name interest)
 {
   if (m_timeoutCallback.empty ())
     {
@@ -46,7 +46,7 @@
 
 
 void
-Closure::runDataCallback(const Name &name, PcoPtr content)
+Closure::runDataCallback(Name name, PcoPtr content)
 {
   if (!m_dataCallback.empty ())
     {
@@ -54,10 +54,47 @@
     }
 }
 
-Closure *
-Closure::dup () const
+ExecutorClosure::ExecutorClosure(const Closure &closure, ExecutorPtr executor)
+                : Closure(closure)
+                , m_executor(executor)
 {
-  return new Closure (*this);
+}
+
+ExecutorClosure::~ExecutorClosure()
+{
+}
+
+void
+ExecutorClosure::runDataCallback(Name name, PcoPtr content)
+{
+  m_executor->execute(boost::bind(&ExecutorClosure::execute, this, name, content));
+}
+
+void
+ExecutorClosure::execute(Name name, PcoPtr content)
+{
+  Closure::runDataCallback(name, content);
+}
+
+ExecutorInterestClosure::ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor)
+                        : m_callback(callback)
+                        , m_executor(executor)
+{
+}
+
+void
+ExecutorInterestClosure::runInterestCallback(Name interest)
+{
+  m_executor->execute(boost::bind(&ExecutorInterestClosure::execute, this, interest));
+}
+
+void
+ExecutorInterestClosure::execute(Name interest)
+{
+  if (!m_callback.empty())
+  {
+    m_callback(interest);
+  }
 }
 
 } // Ccnx
diff --git a/ccnx/ccnx-closure.h b/ccnx/ccnx-closure.h
index 343d8be..8b80aa8 100644
--- a/ccnx/ccnx-closure.h
+++ b/ccnx/ccnx-closure.h
@@ -24,6 +24,7 @@
 
 #include "ccnx-common.h"
 #include "ccnx-name.h"
+#include "executor.h"
 
 namespace Ccnx {
 
@@ -33,7 +34,7 @@
 class Closure
 {
 public:
-  typedef boost::function<void (const Name &, PcoPtr pco)> DataCallback;
+  typedef boost::function<void (Name, PcoPtr pco)> DataCallback;
 
   typedef enum
   {
@@ -41,25 +42,57 @@
     RESULT_REEXPRESS
   } TimeoutCallbackReturnValue;
 
-  typedef boost::function<TimeoutCallbackReturnValue (const Name &)> TimeoutCallback;
+  typedef boost::function<TimeoutCallbackReturnValue (Name )> TimeoutCallback;
 
   Closure(const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback = TimeoutCallback());
   virtual ~Closure();
 
   virtual void
-  runDataCallback(const Name &name, Ccnx::PcoPtr pco);
+  runDataCallback(Name name, Ccnx::PcoPtr pco);
 
   virtual TimeoutCallbackReturnValue
-  runTimeoutCallback(const Name &interest);
-
-  virtual Closure *
-  dup() const;
+  runTimeoutCallback(Name interest);
 
 protected:
   TimeoutCallback m_timeoutCallback;
   DataCallback m_dataCallback;
 };
 
+class ExecutorClosure : public Closure
+{
+public:
+  ExecutorClosure(const Closure &closure, ExecutorPtr executor);
+  virtual ~ExecutorClosure();
+
+  virtual void
+  runDataCallback(Name name, PcoPtr pco);
+
+private:
+  void
+  execute(Name nae, PcoPtr content);
+
+private:
+  ExecutorPtr m_executor;
+};
+
+class ExecutorInterestClosure
+{
+public:
+  typedef boost::function<void (Name)> InterestCallback;
+  ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor);
+  ~ExecutorInterestClosure() {}
+
+  void
+  runInterestCallback(Name interest);
+
+  void
+  execute(Name interest);
+
+private:
+  InterestCallback m_callback;
+  ExecutorPtr m_executor;
+};
+
 } // Ccnx
 
 #endif
diff --git a/ccnx/ccnx-wrapper.cpp b/ccnx/ccnx-wrapper.cpp
index d584be4..43c9780 100644
--- a/ccnx/ccnx-wrapper.cpp
+++ b/ccnx/ccnx-wrapper.cpp
@@ -47,9 +47,11 @@
   : m_handle (0)
   , m_running (true)
   , m_connected (false)
+  , m_executor (new Executor(1))
 {
   connectCcnd();
   m_thread = thread (&CcnxWrapper::ccnLoop, this);
+  m_executor->start();
 }
 
 void
@@ -85,6 +87,8 @@
     m_running = false;
   }
 
+  m_executor->shutdown();
+
   m_thread.join ();
   ccn_disconnect (m_handle);
   //ccn_destroy (&m_handle);
@@ -246,13 +250,13 @@
                  ccn_upcall_kind kind,
                  ccn_upcall_info *info)
 {
-  CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+  ExecutorInterestClosure *closure = static_cast<ExecutorInterestClosure*> (selfp->data);
   _LOG_TRACE (">> incomingInterest upcall");
 
   switch (kind)
     {
     case CCN_UPCALL_FINAL: // effective in unit tests
-      delete f;
+      delete closure;
       delete selfp;
       _LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
       return CCN_UPCALL_RESULT_OK;
@@ -267,7 +271,8 @@
 
   Name interest(info->interest_ccnb, info->interest_comps);
 
-  (*f) (interest);
+  // this will be run in executor
+  closure->runInterestCallback(interest);
 
   _LOG_TRACE ("<< incomingInterest");
   return CCN_UPCALL_RESULT_OK;
@@ -298,6 +303,7 @@
       {
         _LOG_TRACE ("<< incomingData timeout");
         Name interest(info->interest_ccnb, info->interest_comps);
+        // We can not run timeout callback in executor, because we need the return value
         Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
         switch(rv)
         {
@@ -333,6 +339,7 @@
   // // otherwise the pointed memory may have been changed during the processing
   // readRaw(content, pcontent, len);
 
+  // this will be run in executor
   cp->runDataCallback (pco->name (), pco);
 
   _LOG_TRACE (">> incomingData");
@@ -356,7 +363,7 @@
   ccn_charbuf *pname = namePtr->getBuf();
   ccn_closure *dataClosure = new ccn_closure;
 
-  Closure *myClosure = closure.dup();
+  Closure *myClosure = new ExecutorClosure(closure, m_executor);
   dataClosure->data = (void *)myClosure;
 
   dataClosure->p = &incomingData;
@@ -384,6 +391,7 @@
   _LOG_TRACE (">> setInterestFilter");
   UniqueRecLock lock(m_mutex);
   if (!m_running || !m_connected)
+  {
     return -1;
   }
 
@@ -391,10 +399,9 @@
   ccn_charbuf *pname = ptr->getBuf();
   ccn_closure *interestClosure = new ccn_closure;
 
-  interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+  interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
   interestClosure->p = &incomingInterest;
 
-  UniqueRecLock lock(m_mutex);
   int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
   if (ret < 0)
   {
diff --git a/ccnx/ccnx-wrapper.h b/ccnx/ccnx-wrapper.h
index bdfdd2c..91e28c5 100644
--- a/ccnx/ccnx-wrapper.h
+++ b/ccnx/ccnx-wrapper.h
@@ -31,6 +31,7 @@
 #include "ccnx-selectors.h"
 #include "ccnx-closure.h"
 #include "ccnx-pco.h"
+#include "executor.h"
 
 namespace Ccnx {
 
@@ -97,6 +98,7 @@
   bool m_running;
   bool m_connected;
   std::map<Name, InterestCallback> m_registeredInterests;
+  ExecutorPtr m_executor;
 };
 
 typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;