core: add a facility to execute functions on the RIB io_service

refs #4279

Change-Id: I29222053348e5d1737d47c85a59d29280b4b791b
diff --git a/core/global-io.cpp b/core/global-io.cpp
index 48566f6..bd73244 100644
--- a/core/global-io.cpp
+++ b/core/global-io.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2015,  Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -34,8 +34,8 @@
 resetGlobalScheduler();
 } // namespace scheduler
 
-
 static boost::thread_specific_ptr<boost::asio::io_service> g_ioService;
+static boost::asio::io_service* g_ribIoService = nullptr;
 
 boost::asio::io_service&
 getGlobalIoService()
@@ -53,4 +53,23 @@
   g_ioService.reset();
 }
 
+void
+setRibIoService(boost::asio::io_service* ribIo)
+{
+  g_ribIoService = ribIo;
+}
+
+boost::asio::io_service&
+getRibIoService()
+{
+  BOOST_ASSERT(g_ribIoService != nullptr);
+  return *g_ribIoService;
+}
+
+void
+runOnRibIoService(const std::function<void()>& f)
+{
+  getRibIoService().post(f);
+}
+
 } // namespace nfd
diff --git a/core/global-io.hpp b/core/global-io.hpp
index d5030ab..f9abccf 100644
--- a/core/global-io.hpp
+++ b/core/global-io.hpp
@@ -1,11 +1,11 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014  Regents of the University of California,
- *                     Arizona Board of Regents,
- *                     Colorado State University,
- *                     University Pierre & Marie Curie, Sorbonne University,
- *                     Washington University in St. Louis,
- *                     Beijing Institute of Technology
+/*
+ * Copyright (c) 2014-2018  Regents of the University of California,
+ *                          Arizona Board of Regents,
+ *                          Colorado State University,
+ *                          University Pierre & Marie Curie, Sorbonne University,
+ *                          Washington University in St. Louis,
+ *                          Beijing Institute of Technology
  *
  * This file is part of NFD (Named Data Networking Forwarding Daemon).
  * See AUTHORS.md for complete list of NFD authors and contributors.
@@ -34,7 +34,18 @@
 boost::asio::io_service&
 getGlobalIoService();
 
+void
+setRibIoService(boost::asio::io_service* ribIo);
+
+/** \brief run a function on the RIB io_service instance
+ */
+void
+runOnRibIoService(const std::function<void()>& f);
+
 #ifdef WITH_TESTS
+boost::asio::io_service&
+getRibIoService();
+
 /** \brief delete the global io_service instance
  *
  *  It will be recreated at the next invocation of getGlobalIoService.
diff --git a/daemon/main.cpp b/daemon/main.cpp
index 4cff13b..056d359 100644
--- a/daemon/main.cpp
+++ b/daemon/main.cpp
@@ -120,6 +120,7 @@
           std::lock_guard<std::mutex> lock(m);
           ribIo = &getGlobalIoService();
           BOOST_ASSERT(ribIo != mainIo);
+          setRibIoService(ribIo);
         }
         cv.notify_all(); // notify that ribIo has been assigned
 
diff --git a/tests/core/global-io.t.cpp b/tests/core/global-io.t.cpp
index 8c7506c..8ae9126 100644
--- a/tests/core/global-io.t.cpp
+++ b/tests/core/global-io.t.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2015,  Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -24,7 +24,9 @@
  */
 
 #include "core/global-io.hpp"
+#include "core/scheduler.hpp"
 
+#include "tests/rib-io-fixture.hpp"
 #include "tests/test-common.hpp"
 
 #include <boost/thread.hpp>
@@ -49,6 +51,71 @@
   BOOST_CHECK(s1 != s2);
 }
 
+BOOST_FIXTURE_TEST_CASE(RibIoService, RibIoFixture)
+{
+  boost::asio::io_service* mainIo = &g_io;
+  boost::asio::io_service* ribIo = g_ribIo;
+
+  BOOST_CHECK(mainIo != ribIo);
+  BOOST_CHECK(&getGlobalIoService() == mainIo);
+  BOOST_CHECK(&getRibIoService() == ribIo);
+  auto mainThreadId = boost::this_thread::get_id();
+
+  runOnRibIoService([&] {
+    BOOST_CHECK(mainThreadId != boost::this_thread::get_id());
+    BOOST_CHECK(&getRibIoService() == ribIo);
+    BOOST_CHECK(&getGlobalIoService() == ribIo);
+  });
+}
+
+BOOST_FIXTURE_TEST_CASE(PollInAllThreads, RibIoFixture)
+{
+  bool hasRibRun = false;
+  runOnRibIoService([&] { hasRibRun = true; });
+  boost::this_thread::sleep_for(1_s);
+  BOOST_CHECK_EQUAL(hasRibRun, false);
+
+  poll();
+  BOOST_CHECK_EQUAL(hasRibRun, true);
+
+  hasRibRun = false;
+  bool hasMainRun = false;
+  g_io.post([&] {
+      hasMainRun = true;
+      runOnRibIoService([&] { hasRibRun = true; });
+    });
+  BOOST_CHECK_EQUAL(hasMainRun, false);
+  BOOST_CHECK_EQUAL(hasRibRun, false);
+
+  poll();
+  BOOST_CHECK_EQUAL(hasMainRun, true);
+  BOOST_CHECK_EQUAL(hasRibRun, true);
+}
+
+BOOST_FIXTURE_TEST_CASE(AdvanceClocks, RibIoTimeFixture)
+{
+  bool hasRibRun = false;
+  runOnRibIoService([&] { hasRibRun = true; });
+  boost::this_thread::sleep_for(1_s);
+  BOOST_CHECK_EQUAL(hasRibRun, false);
+
+  advanceClocks(1_ns, 1);
+  BOOST_CHECK_EQUAL(hasRibRun, true);
+
+  hasRibRun = false;
+  bool hasMainRun = false;
+  scheduler::schedule(250_ms, [&] {
+      hasMainRun = true;
+      runOnRibIoService([&] { hasRibRun = true; });
+    });
+  BOOST_CHECK_EQUAL(hasMainRun, false);
+  BOOST_CHECK_EQUAL(hasRibRun, false);
+
+  advanceClocks(260_ms, 2);
+  BOOST_CHECK_EQUAL(hasMainRun, true);
+  BOOST_CHECK_EQUAL(hasRibRun, true);
+}
+
 BOOST_AUTO_TEST_SUITE_END()
 
 } // namespace tests
diff --git a/tests/limited-io.hpp b/tests/limited-io.hpp
index 4891b8a..a42a797 100644
--- a/tests/limited-io.hpp
+++ b/tests/limited-io.hpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
-/**
- * Copyright (c) 2014-2015,  Regents of the University of California,
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -35,6 +35,8 @@
 namespace tests {
 
 /** \brief provides IO operations limit and/or time limit for unit testing
+ *
+ *  \warning LimitedIo is incompatible with RibIoFixture
  */
 class LimitedIo : noncopyable
 {
diff --git a/tests/rib-io-fixture.cpp b/tests/rib-io-fixture.cpp
new file mode 100644
index 0000000..ef228d5
--- /dev/null
+++ b/tests/rib-io-fixture.cpp
@@ -0,0 +1,144 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
+ *                           Arizona Board of Regents,
+ *                           Colorado State University,
+ *                           University Pierre & Marie Curie, Sorbonne University,
+ *                           Washington University in St. Louis,
+ *                           Beijing Institute of Technology,
+ *                           The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "rib-io-fixture.hpp"
+#include "core/extended-error-message.hpp"
+#include <iostream>
+
+namespace nfd {
+namespace tests {
+
+RibIoFixture::RibIoFixture()
+{
+  std::mutex m;
+  std::condition_variable cv;
+
+  g_ribThread = boost::thread([&] {
+    {
+      std::lock_guard<std::mutex> lock(m);
+      g_ribIo = &getGlobalIoService();
+      setRibIoService(g_ribIo);
+      BOOST_ASSERT(&g_io != g_ribIo);
+      BOOST_ASSERT(g_ribIo == &getRibIoService());
+    }
+    cv.notify_all();
+
+    try {
+      while (true) {
+        {
+          std::unique_lock<std::mutex> lock(m_ribPollMutex);
+          m_ribPollStartCv.wait(lock, [this] { return m_shouldStopRibIo || m_shouldPollRibIo; });
+          if (m_shouldStopRibIo) {
+            break;
+          }
+          BOOST_ASSERT(m_shouldPollRibIo);
+        }
+
+        if (g_ribIo->stopped()) {
+          g_ribIo->reset();
+        }
+        while (g_ribIo->poll() > 0)
+          ;
+
+        {
+          std::lock_guard<std::mutex> lock(m_ribPollMutex);
+          m_shouldPollRibIo = false;
+        }
+        m_ribPollEndCv.notify_all();
+      }
+    }
+    catch (const std::exception& e) {
+      std::cerr << "Exception in RIB thread: " << getExtendedErrorMessage(e) << std::endl;
+      throw;
+    }
+  });
+
+  {
+    std::unique_lock<std::mutex> lock(m);
+    cv.wait(lock, [this] { return g_ribIo != nullptr; });
+  }
+}
+
+RibIoFixture::~RibIoFixture()
+{
+  {
+    std::lock_guard<std::mutex> lock(m_ribPollMutex);
+    m_shouldStopRibIo = true;
+  }
+  m_ribPollStartCv.notify_all();
+  g_ribThread.join();
+}
+
+void
+RibIoFixture::poll()
+{
+  BOOST_ASSERT(&getGlobalIoService() == &g_io);
+
+  size_t nHandlersRun = 0;
+  do {
+    {
+      std::lock_guard<std::mutex> lock(m_ribPollMutex);
+      m_shouldPollRibIo = true;
+    }
+    m_ribPollStartCv.notify_all();
+
+    if (g_io.stopped()) {
+      g_io.reset();
+    }
+
+    nHandlersRun = g_io.poll();
+
+    {
+      std::unique_lock<std::mutex> lock(m_ribPollMutex);
+      m_ribPollEndCv.wait(lock, [this] { return !m_shouldPollRibIo; });
+    }
+  } while (nHandlersRun > 0);
+}
+
+void
+RibIoTimeFixture::advanceClocks(time::nanoseconds tick, time::nanoseconds total)
+{
+  BOOST_ASSERT(tick > time::nanoseconds::zero());
+  BOOST_ASSERT(total >= time::nanoseconds::zero());
+
+  time::nanoseconds remaining = total;
+  while (remaining > time::nanoseconds::zero()) {
+    if (remaining >= tick) {
+      steadyClock->advance(tick);
+      systemClock->advance(tick);
+      remaining -= tick;
+    }
+    else {
+      steadyClock->advance(remaining);
+      systemClock->advance(remaining);
+      remaining = time::nanoseconds::zero();
+    }
+
+    poll();
+  }
+}
+
+} // namespace tests
+} // namespace nfd
diff --git a/tests/rib-io-fixture.hpp b/tests/rib-io-fixture.hpp
new file mode 100644
index 0000000..e89ba44
--- /dev/null
+++ b/tests/rib-io-fixture.hpp
@@ -0,0 +1,100 @@
+/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
+/*
+ * Copyright (c) 2014-2018,  Regents of the University of California,
+ *                           Arizona Board of Regents,
+ *                           Colorado State University,
+ *                           University Pierre & Marie Curie, Sorbonne University,
+ *                           Washington University in St. Louis,
+ *                           Beijing Institute of Technology,
+ *                           The University of Memphis.
+ *
+ * This file is part of NFD (Named Data Networking Forwarding Daemon).
+ * See AUTHORS.md for complete list of NFD authors and contributors.
+ *
+ * NFD is free software: you can redistribute it and/or modify it under the terms
+ * of the GNU General Public License as published by the Free Software Foundation,
+ * either version 3 of the License, or (at your option) any later version.
+ *
+ * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
+ * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
+ * PURPOSE.  See the GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along with
+ * NFD, e.g., in COPYING.md file.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef NFD_TESTS_RIB_IO_FIXTURE_HPP
+#define NFD_TESTS_RIB_IO_FIXTURE_HPP
+
+#include "tests/test-common.hpp"
+#include <boost/thread.hpp>
+#include <condition_variable>
+#include <mutex>
+
+namespace nfd {
+namespace tests {
+
+/** \brief a base test fixture that provides both main and RIB io_service
+ */
+class RibIoFixture: public virtual BaseFixture
+{
+protected:
+  RibIoFixture();
+
+  ~RibIoFixture();
+
+protected:
+  /** \brief Poll main and RIB thread io_service to process all pending I/O events
+   *
+   * This call will execute all pending I/O events, including events that are posted
+   * inside the processing event, i.e., main and RIB thread io_service will be polled
+   * repeatedly until all pending events are processed.
+   *
+   *  \note Must be called from the main thread
+   */
+  void
+  poll();
+
+protected:
+  /** \brief pointer to global RIB io_service
+   */
+  boost::asio::io_service* g_ribIo = nullptr;
+
+  /** \brief global RIB thread
+   */
+  boost::thread g_ribThread;
+
+private:
+  bool m_shouldStopRibIo = false;
+  bool m_shouldPollRibIo = false;
+  std::mutex m_ribPollMutex;
+  std::condition_variable m_ribPollStartCv;
+  std::condition_variable m_ribPollEndCv;
+};
+
+/** \brief RibIoFixture that also overrides steady clock and system clock
+ */
+class RibIoTimeFixture : public RibIoFixture, public UnitTestTimeFixture
+{
+protected:
+  using UnitTestTimeFixture::advanceClocks;
+
+  /** \brief advance steady and system clocks in the main and RIB threads
+   *
+   *  Clocks are advanced in increments of \p tick for \p total time.
+   *  The last increment might be shorter than \p tick.
+   *  After each tick, the main and RIB thread io_service is polled to process pending I/O events.
+   *
+   *  Exceptions thrown during I/O events are propagated to the caller.
+   *  Clock advancing would stop in case of an exception.
+   *
+   *  \note Must be called from the main thread
+   */
+  void
+  advanceClocks(time::nanoseconds tick, time::nanoseconds total) override;
+};
+
+} // namespace tests
+} // namespace nfd
+
+#endif // NFD_TESTS_RIB_IO_FIXTURE_HPP
diff --git a/tests/test-common.cpp b/tests/test-common.cpp
index 2cb747e..21cb91a 100644
--- a/tests/test-common.cpp
+++ b/tests/test-common.cpp
@@ -1,6 +1,6 @@
 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
 /*
- * Copyright (c) 2014-2017,  Regents of the University of California,
+ * Copyright (c) 2014-2018,  Regents of the University of California,
  *                           Arizona Board of Regents,
  *                           Colorado State University,
  *                           University Pierre & Marie Curie, Sorbonne University,
@@ -52,13 +52,13 @@
 }
 
 void
-UnitTestTimeFixture::advanceClocks(const time::nanoseconds& tick, size_t nTicks)
+UnitTestTimeFixture::advanceClocks(time::nanoseconds tick, size_t nTicks)
 {
-  this->advanceClocks(tick, tick * nTicks);
+  advanceClocks(tick, tick * nTicks);
 }
 
 void
-UnitTestTimeFixture::advanceClocks(const time::nanoseconds& tick, const time::nanoseconds& total)
+UnitTestTimeFixture::advanceClocks(time::nanoseconds tick, time::nanoseconds total)
 {
   BOOST_ASSERT(tick > time::nanoseconds::zero());
   BOOST_ASSERT(total >= time::nanoseconds::zero());
diff --git a/tests/test-common.hpp b/tests/test-common.hpp
index 110a1e2..1a4a0d5 100644
--- a/tests/test-common.hpp
+++ b/tests/test-common.hpp
@@ -58,6 +58,7 @@
 protected:
   BaseFixture();
 
+  virtual
   ~BaseFixture();
 
 protected:
@@ -73,6 +74,7 @@
 protected:
   UnitTestTimeFixture();
 
+  virtual
   ~UnitTestTimeFixture();
 
   /** \brief advance steady and system clocks
@@ -83,8 +85,8 @@
    *  Exceptions thrown during I/O events are propagated to the caller.
    *  Clock advancing would stop in case of an exception.
    */
-  void
-  advanceClocks(const time::nanoseconds& tick, size_t nTicks = 1);
+  virtual void
+  advanceClocks(time::nanoseconds tick, size_t nTicks = 1);
 
   /** \brief advance steady and system clocks
    *
@@ -95,8 +97,8 @@
    *  Exceptions thrown during I/O events are propagated to the caller.
    *  Clock advancing would stop in case of an exception.
    */
-  void
-  advanceClocks(const time::nanoseconds& tick, const time::nanoseconds& total);
+  virtual void
+  advanceClocks(time::nanoseconds tick, time::nanoseconds total);
 
 protected:
   shared_ptr<time::UnitTestSteadyClock> steadyClock;