Run ccnx interest and data callback in executor
Run scheduler task in executor
diff --git a/ccnx/ccnx-closure.cpp b/ccnx/ccnx-closure.cpp
index 312af6d..5abcb3e 100644
--- a/ccnx/ccnx-closure.cpp
+++ b/ccnx/ccnx-closure.cpp
@@ -34,7 +34,7 @@
 }
 
 Closure::TimeoutCallbackReturnValue
-Closure::runTimeoutCallback(const Name &interest)
+Closure::runTimeoutCallback(Name interest)
 {
   if (m_timeoutCallback.empty ())
     {
@@ -46,7 +46,7 @@
 
 
 void
-Closure::runDataCallback(const Name &name, PcoPtr content)
+Closure::runDataCallback(Name name, PcoPtr content)
 {
   if (!m_dataCallback.empty ())
     {
@@ -54,10 +54,47 @@
     }
 }
 
-Closure *
-Closure::dup () const
+ExecutorClosure::ExecutorClosure(const Closure &closure, ExecutorPtr executor)
+                : Closure(closure)
+                , m_executor(executor)
 {
-  return new Closure (*this);
+}
+
+ExecutorClosure::~ExecutorClosure()
+{
+}
+
+void
+ExecutorClosure::runDataCallback(Name name, PcoPtr content)
+{
+  m_executor->execute(boost::bind(&ExecutorClosure::execute, this, name, content));
+}
+
+void
+ExecutorClosure::execute(Name name, PcoPtr content)
+{
+  Closure::runDataCallback(name, content);
+}
+
+ExecutorInterestClosure::ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor)
+                        : m_callback(callback)
+                        , m_executor(executor)
+{
+}
+
+void
+ExecutorInterestClosure::runInterestCallback(Name interest)
+{
+  m_executor->execute(boost::bind(&ExecutorInterestClosure::execute, this, interest));
+}
+
+void
+ExecutorInterestClosure::execute(Name interest)
+{
+  if (!m_callback.empty())
+  {
+    m_callback(interest);
+  }
 }
 
 } // Ccnx
diff --git a/ccnx/ccnx-closure.h b/ccnx/ccnx-closure.h
index 343d8be..8b80aa8 100644
--- a/ccnx/ccnx-closure.h
+++ b/ccnx/ccnx-closure.h
@@ -24,6 +24,7 @@
 
 #include "ccnx-common.h"
 #include "ccnx-name.h"
+#include "executor.h"
 
 namespace Ccnx {
 
@@ -33,7 +34,7 @@
 class Closure
 {
 public:
-  typedef boost::function<void (const Name &, PcoPtr pco)> DataCallback;
+  typedef boost::function<void (Name, PcoPtr pco)> DataCallback;
 
   typedef enum
   {
@@ -41,25 +42,57 @@
     RESULT_REEXPRESS
   } TimeoutCallbackReturnValue;
 
-  typedef boost::function<TimeoutCallbackReturnValue (const Name &)> TimeoutCallback;
+  typedef boost::function<TimeoutCallbackReturnValue (Name )> TimeoutCallback;
 
   Closure(const DataCallback &dataCallback, const TimeoutCallback &timeoutCallback = TimeoutCallback());
   virtual ~Closure();
 
   virtual void
-  runDataCallback(const Name &name, Ccnx::PcoPtr pco);
+  runDataCallback(Name name, Ccnx::PcoPtr pco);
 
   virtual TimeoutCallbackReturnValue
-  runTimeoutCallback(const Name &interest);
-
-  virtual Closure *
-  dup() const;
+  runTimeoutCallback(Name interest);
 
 protected:
   TimeoutCallback m_timeoutCallback;
   DataCallback m_dataCallback;
 };
 
+class ExecutorClosure : public Closure
+{
+public:
+  ExecutorClosure(const Closure &closure, ExecutorPtr executor);
+  virtual ~ExecutorClosure();
+
+  virtual void
+  runDataCallback(Name name, PcoPtr pco);
+
+private:
+  void
+  execute(Name nae, PcoPtr content);
+
+private:
+  ExecutorPtr m_executor;
+};
+
+class ExecutorInterestClosure
+{
+public:
+  typedef boost::function<void (Name)> InterestCallback;
+  ExecutorInterestClosure(const InterestCallback &callback, ExecutorPtr executor);
+  ~ExecutorInterestClosure() {}
+
+  void
+  runInterestCallback(Name interest);
+
+  void
+  execute(Name interest);
+
+private:
+  InterestCallback m_callback;
+  ExecutorPtr m_executor;
+};
+
 } // Ccnx
 
 #endif
diff --git a/ccnx/ccnx-wrapper.cpp b/ccnx/ccnx-wrapper.cpp
index d584be4..43c9780 100644
--- a/ccnx/ccnx-wrapper.cpp
+++ b/ccnx/ccnx-wrapper.cpp
@@ -47,9 +47,11 @@
   : m_handle (0)
   , m_running (true)
   , m_connected (false)
+  , m_executor (new Executor(1))
 {
   connectCcnd();
   m_thread = thread (&CcnxWrapper::ccnLoop, this);
+  m_executor->start();
 }
 
 void
@@ -85,6 +87,8 @@
     m_running = false;
   }
 
+  m_executor->shutdown();
+
   m_thread.join ();
   ccn_disconnect (m_handle);
   //ccn_destroy (&m_handle);
@@ -246,13 +250,13 @@
                  ccn_upcall_kind kind,
                  ccn_upcall_info *info)
 {
-  CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
+  ExecutorInterestClosure *closure = static_cast<ExecutorInterestClosure*> (selfp->data);
   _LOG_TRACE (">> incomingInterest upcall");
 
   switch (kind)
     {
     case CCN_UPCALL_FINAL: // effective in unit tests
-      delete f;
+      delete closure;
       delete selfp;
       _LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
       return CCN_UPCALL_RESULT_OK;
@@ -267,7 +271,8 @@
 
   Name interest(info->interest_ccnb, info->interest_comps);
 
-  (*f) (interest);
+  // this will be run in executor
+  closure->runInterestCallback(interest);
 
   _LOG_TRACE ("<< incomingInterest");
   return CCN_UPCALL_RESULT_OK;
@@ -298,6 +303,7 @@
       {
         _LOG_TRACE ("<< incomingData timeout");
         Name interest(info->interest_ccnb, info->interest_comps);
+        // We can not run timeout callback in executor, because we need the return value
         Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
         switch(rv)
         {
@@ -333,6 +339,7 @@
   // // otherwise the pointed memory may have been changed during the processing
   // readRaw(content, pcontent, len);
 
+  // this will be run in executor
   cp->runDataCallback (pco->name (), pco);
 
   _LOG_TRACE (">> incomingData");
@@ -356,7 +363,7 @@
   ccn_charbuf *pname = namePtr->getBuf();
   ccn_closure *dataClosure = new ccn_closure;
 
-  Closure *myClosure = closure.dup();
+  Closure *myClosure = new ExecutorClosure(closure, m_executor);
   dataClosure->data = (void *)myClosure;
 
   dataClosure->p = &incomingData;
@@ -384,6 +391,7 @@
   _LOG_TRACE (">> setInterestFilter");
   UniqueRecLock lock(m_mutex);
   if (!m_running || !m_connected)
+  {
     return -1;
   }
 
@@ -391,10 +399,9 @@
   ccn_charbuf *pname = ptr->getBuf();
   ccn_closure *interestClosure = new ccn_closure;
 
-  interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
+  interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
   interestClosure->p = &incomingInterest;
 
-  UniqueRecLock lock(m_mutex);
   int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
   if (ret < 0)
   {
diff --git a/ccnx/ccnx-wrapper.h b/ccnx/ccnx-wrapper.h
index bdfdd2c..91e28c5 100644
--- a/ccnx/ccnx-wrapper.h
+++ b/ccnx/ccnx-wrapper.h
@@ -31,6 +31,7 @@
 #include "ccnx-selectors.h"
 #include "ccnx-closure.h"
 #include "ccnx-pco.h"
+#include "executor.h"
 
 namespace Ccnx {
 
@@ -97,6 +98,7 @@
   bool m_running;
   bool m_connected;
   std::map<Name, InterestCallback> m_registeredInterests;
+  ExecutorPtr m_executor;
 };
 
 typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;
diff --git a/scheduler/scheduler.cc b/scheduler/scheduler.cc
index 02b6431..1e9aee7 100644
--- a/scheduler/scheduler.cc
+++ b/scheduler/scheduler.cc
@@ -56,6 +56,7 @@
 
 Scheduler::Scheduler()
   : m_running(false)
+  , m_executor(1)
 {
   event_set_fatal_callback(errorCallback);
   evthread_use_pthreads();
@@ -113,12 +114,19 @@
 }
 
 void
+Scheduler::execute(Executor::Job job)
+{
+  m_executor.execute(job);
+}
+
+void
 Scheduler::start()
 {
   ScopedLock lock(m_mutex);
   if (!m_running)
   {
     m_thread = boost::thread(&Scheduler::eventLoop, this);
+    m_executor.start();
     m_running = true;
   }
 }
@@ -139,6 +147,7 @@
   if (breakAndWait)
     {
       event_base_loopbreak(m_base);
+      m_executor.shutdown();
       m_thread.join();
     }
 }
diff --git a/scheduler/scheduler.h b/scheduler/scheduler.h
index 18bd9bd..096be98 100644
--- a/scheduler/scheduler.h
+++ b/scheduler/scheduler.h
@@ -39,6 +39,7 @@
 
 #include "task.h"
 #include "interval-generator.h"
+#include "executor.h"
 
 class Scheduler;
 typedef boost::shared_ptr<Scheduler> SchedulerPtr;
@@ -110,6 +111,9 @@
   rescheduleTaskAt (TaskPtr task, double time);
 
   void
+  execute(Executor::Job);
+
+  void
   eventLoop();
 
   event_base *
@@ -135,6 +139,7 @@
   event_base *m_base;
   event *m_ev;
   boost::thread m_thread;
+  Executor m_executor;
 };
 
 struct SchedulerException : virtual boost::exception, virtual std::exception { };
diff --git a/scheduler/task.cc b/scheduler/task.cc
index 0d09199..4702906 100644
--- a/scheduler/task.cc
+++ b/scheduler/task.cc
@@ -26,7 +26,7 @@
 eventCallback(evutil_socket_t fd, short what, void *arg)
 {
   Task *task = static_cast<Task *>(arg);
-  task->run();
+  task->execute();
   task = NULL;
 }
 
@@ -68,3 +68,9 @@
   m_tv->tv_sec = static_cast<int>(intPart);
   m_tv->tv_usec = static_cast<int>((fraction * 1000000));
 }
+
+void
+Task::execute()
+{
+  m_scheduler->execute(boost::bind(&Task::run, this));
+}
diff --git a/scheduler/task.h b/scheduler/task.h
index 8289c57..41e4438 100644
--- a/scheduler/task.h
+++ b/scheduler/task.h
@@ -86,6 +86,9 @@
   void
   setTv(double delay);
 
+  void
+  execute();
+
 protected:
   Callback m_callback;
   Tag m_tag;
diff --git a/src/executor.h b/src/executor.h
index 54a0108..c8e6cb0 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -23,6 +23,7 @@
 #define EXECUTOR_H
 
 #include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
 #include <boost/thread/condition_variable.hpp>
 #include <boost/thread/mutex.hpp>
 #include <boost/thread/locks.hpp>
@@ -83,4 +84,6 @@
   volatile bool m_needStop;
   int m_poolSize;
 };
+
+typedef boost::shared_ptr<Executor> ExecutorPtr;
 #endif // EXECUTOR_H
diff --git a/wscript b/wscript
index aff8069..e7033f1 100644
--- a/wscript
+++ b/wscript
@@ -72,7 +72,7 @@
         features = ["cxx"],
         source = bld.path.ant_glob(['scheduler/**/*.cc']),
         use = 'BOOST BOOST_THREAD LIBEVENT LIBEVENT_PTHREADS LOG4CXX',
-        includes = "scheduler src",
+        includes = "scheduler executor src",
         )
 
     libccnx = bld (
@@ -80,7 +80,7 @@
         features=['cxx'],
         source = bld.path.ant_glob(['ccnx/**/*.cc', 'ccnx/**/*.cpp']),
         use = 'BOOST BOOST_THREAD SSL CCNX LOG4CXX scheduler',
-        includes = "ccnx src scheduler",
+        includes = "ccnx src scheduler executor",
         )
 
     chornoshare = bld (
@@ -88,7 +88,7 @@
         features=['cxx'],
         source = bld.path.ant_glob(['src/**/*.cc', 'src/**/*.cpp', 'src/**/*.proto']),
         use = "BOOST BOOST_FILESYSTEM SQLITE3 LOG4CXX scheduler ccnx",
-        includes = "ccnx scheduler src",
+        includes = "ccnx scheduler src executor",
         )
 
     # Unit tests
@@ -98,7 +98,7 @@
           source = bld.path.ant_glob(['test/*.cc']),
           features=['cxx', 'cxxprogram'],
           use = 'BOOST_TEST BOOST_FILESYSTEM LOG4CXX ccnx database chronoshare',
-          includes = "ccnx scheduler src",
+          includes = "ccnx scheduler src executor",
           )
 
     app_plist = '''<?xml version="1.0" encoding="UTF-8"?>