Adding randomization for reconnection to local ccnd
diff --git a/ccnx/sync-ccnx-wrapper.cc b/ccnx/sync-ccnx-wrapper.cc
index 3e2ebc1..ba7e973 100644
--- a/ccnx/sync-ccnx-wrapper.cc
+++ b/ccnx/sync-ccnx-wrapper.cc
@@ -25,6 +25,9 @@
#include <poll.h>
#include <boost/throw_exception.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/random.hpp>
+
+#include "sync-scheduler.h"
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
@@ -45,6 +48,7 @@
, m_keyStore (0)
, m_keyLoactor (0)
, m_running (true)
+ , m_connected (false)
{
#ifdef _DEBUG_WRAPPER_
m_c = c;
@@ -58,6 +62,8 @@
void
CcnxWrapper::connectCcnd()
{
+ recursive_mutex::scoped_lock lock (m_mutex);
+
if (m_handle != 0) {
ccn_disconnect (m_handle);
ccn_destroy (&m_handle);
@@ -70,6 +76,8 @@
_LOG_DEBUG("<<< connecting to ccnd failed");
BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
}
+ m_connected = true;
+
if (!m_registeredInterests.empty())
{
for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
@@ -143,10 +151,12 @@
CcnxWrapper::ccnLoop ()
{
_LOG_FUNCTION (this);
+ static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
+ static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
- while (m_running)
- {
- try
+ while (m_running)
+ {
+ try
{
#ifdef _DEBUG_WRAPPER_
std::cout << m_c << flush;
@@ -180,9 +190,10 @@
BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
}
}
- catch (CcnxOperationException e)
+ catch (CcnxOperationException &e)
{
- // probably ccnd has been stoped
+ m_connected = false;
+ // probably ccnd has been stopped
// try reconnect with sleep
int interval = 1;
int maxInterval = 32;
@@ -190,14 +201,16 @@
{
try
{
- sleep(1);
+ this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
+
connectCcnd();
_LOG_DEBUG("reconnect to ccnd succeeded");
break;
}
- catch (CcnxOperationException e)
+ catch (CcnxOperationException &e)
{
- sleep(interval);
+ this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
+
// do exponential backup for reconnect interval
if (interval < maxInterval)
{
@@ -206,6 +219,16 @@
}
}
}
+ catch (const std::exception &exc)
+ {
+ // catch anything thrown within try block that derives from std::exception
+ std::cerr << exc.what();
+ }
+ catch (...)
+ {
+ cout << "UNKNOWN EXCEPTION !!!" << endl;
+ }
+
}
}
@@ -220,7 +243,7 @@
{
recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running)
+ if (!m_running || !m_connected)
return -1;
// cout << "Publish: " << name << endl;
@@ -363,7 +386,7 @@
int CcnxWrapper::sendInterest (const string &strInterest, void *dataPass)
{
recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running)
+ if (!m_running || !m_connected)
return -1;
// std::cout << "Send interests for " << strInterest << std::endl;
@@ -389,7 +412,7 @@
int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
{
recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running)
+ if (!m_running || !m_connected)
return -1;
ccn_charbuf *pname = ccn_charbuf_create();
@@ -412,7 +435,7 @@
CcnxWrapper::clearInterestFilter (const std::string &prefix)
{
recursive_mutex::scoped_lock lock(m_mutex);
- if (!m_running)
+ if (!m_running || !m_connected)
return;
std::cout << "clearInterestFilter" << std::endl;