Correcting SyncLogic with respect of delayed response to unrecognized digests
diff --git a/model/sync-ccnx-wrapper.cc b/model/sync-ccnx-wrapper.cc
index 6b25dfa..1b0e5b5 100644
--- a/model/sync-ccnx-wrapper.cc
+++ b/model/sync-ccnx-wrapper.cc
@@ -24,6 +24,7 @@
#include <poll.h>
#include <boost/throw_exception.hpp>
typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
+typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
using namespace std;
using namespace boost;
@@ -253,8 +254,11 @@
ccn_name_from_uri (pname, prefix.c_str());
interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
interestClosure->p = &incomingInterest;
- if(ccn_set_interest_filter (m_handle, pname, interestClosure) < 0)
- BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed"));
+ int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
+ if (ret < 0)
+ {
+ BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
+ }
ccn_charbuf_destroy(&pname);
}
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index ea341e1..a3fe621 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -25,6 +25,8 @@
#include "sync-full-leaf.h"
#include <boost/make_shared.hpp>
#include <boost/foreach.hpp>
+#include <boost/lexical_cast.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
#include <vector>
using namespace std;
@@ -33,20 +35,138 @@
namespace Sync
{
+const boost::posix_time::time_duration SyncLogic::m_delayedCheckTime = boost::posix_time::seconds (4.0);
+
+
SyncLogic::SyncLogic (const string &syncPrefix,
LogicCallback fetch,
CcnxWrapperPtr ccnxHandle)
: m_syncPrefix (syncPrefix)
, m_fetch (fetch)
, m_ccnxHandle (ccnxHandle)
+ , m_delayedCheckThreadRunning (true)
{
srandom(time(NULL));
- m_ccnxHandle->setInterestFilter(syncPrefix, bind(&SyncLogic::respondSyncInterest, this, _1));
+ m_ccnxHandle->setInterestFilter (syncPrefix,
+ bind (&SyncLogic::respondSyncInterest, this, _1));
+
+ m_delayedCheckThread = thread (&SyncLogic::delayedChecksLoop, this);
}
SyncLogic::~SyncLogic ()
{
+ m_delayedCheckThreadRunning = false;
+ // cout << "Requested stop" << this_thread::get_id () << endl;
+ m_delayedCheckThread.interrupt ();
+ m_delayedCheckThread.join ();
+}
+void
+SyncLogic::delayedChecksLoop ()
+{
+ while (m_delayedCheckThreadRunning)
+ {
+ try
+ {
+ DelayedChecksList::value_type tuple;
+
+ {
+ unique_lock<mutex> lock (m_listChecksMutex);
+ while (m_delayedCheckThreadRunning && m_listChecks.size () == 0)
+ {
+ m_listChecksCondition.wait (lock);
+ // cout << "Got something" << endl;
+ }
+
+ if (m_listChecks.size () == 0) continue;
+
+ tuple = m_listChecks.front ();
+ m_listChecks.pop_front ();
+ // cout << "pop" << endl;
+ // release the mutex
+ }
+
+ // waiting and calling
+
+ // cout << "Duration: " << tuple.get<0> () - get_system_time () << endl;
+ this_thread::sleep (tuple.get<0> ());
+
+ if (!m_delayedCheckThreadRunning) continue;
+ tuple.get<1> () (); // call the scheduled function
+ }
+ catch (thread_interrupted e)
+ {
+ // cout << "interrupted: " << this_thread::get_id () << endl;
+ // do nothing
+ }
+ }
+ // cout << "Exited...\n";
+}
+
+
+
+void
+SyncLogic::respondSyncInterest (const string &interest)
+{
+ string hash = interest.substr(interest.find_last_of("/") + 1);
+ DigestPtr digest = make_shared<Digest> ();
+ try
+ {
+ istringstream is (hash);
+ is >> *digest;
+ }
+ catch (Error::DigestCalculationError &e)
+ {
+ // log error. ignoring it for now, later we should log it
+ return;
+ }
+
+ processSyncInterest (digest, interest);
+}
+
+void
+SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
+{
+ // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+
+ if (*m_state.getDigest() == *digest)
+ {
+ m_syncInterestTable.insert (interestName);
+ return;
+ }
+
+ DiffStateContainer::iterator stateInDiffLog = m_log.find (digest);
+
+ if (stateInDiffLog != m_log.end ())
+ {
+ m_ccnxHandle->publishData (interestName,
+ lexical_cast<string> (*(*stateInDiffLog)->diff ()),
+ m_syncResponseFreshness);
+ return;
+ }
+
+ if (!timedProcessing)
+ {
+ {
+ // Alex: Should we ignore interests if interest with the same digest is already in the wait queue?
+
+ lock_guard<mutex> lock (m_listChecksMutex);
+ system_time delay = get_system_time () + m_delayedCheckTime;
+ // do we need randomization??
+ // delay += boost::ptime::milliseconds (rand() % 80 + 20);
+
+ m_listChecks.push_back (make_tuple (delay,
+ bind (&SyncLogic::processSyncInterest, this, digest, interestName, true))
+ );
+ }
+ m_listChecksCondition.notify_one ();
+ }
+ else
+ {
+ m_ccnxHandle->publishData (interestName + "/state",
+ lexical_cast<string> (m_state),
+ m_syncResponseFreshness);
+ }
}
void
@@ -176,7 +296,7 @@
diff->setDigest(m_state.getDigest());
m_log.insert(diff);
- vector<string> pis = m_syncInterestTable.fetchAll();
+ vector<string> pis = m_syncInterestTable.fetchAll ();
stringstream ss;
ss << *diff;
for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
@@ -186,62 +306,6 @@
}
void
-SyncLogic::checkAgain (const string &interest, DigestPtr digest)
-{
- int wait = rand() % 80 + 20;
- sleep(wait/1000.0);
-
- if (*m_state.getDigest() == *digest)
- {
- m_syncInterestTable.insert(interest);
- return;
- }
-
- DiffStateContainer::iterator ii = m_log.find (digest);
- if (ii != m_log.end ())
- {
- stringstream ss;
- ss << *(*ii)->diff();
- m_ccnxHandle->publishData(interest, ss.str(), m_syncResponseFreshness);
- }
- else
- {
- stringstream ss;
- ss << m_state;
- m_ccnxHandle->publishData(interest + "/state", ss.str(), m_syncResponseFreshness);
- }
-}
-
-void
-SyncLogic::respondSyncInterest (const string &interest)
-{
- string hash = interest.substr(interest.find_last_of("/") + 1);
- DigestPtr digest = make_shared<Digest> ();
- *digest << hash;
- digest->finalize ();
-
- if (*m_state.getDigest() == *digest)
- {
- m_syncInterestTable.insert (interest);
- return;
- }
-
- DiffStateContainer::iterator ii = m_log.find (digest);
-
- if (ii != m_log.end())
- {
- stringstream ss;
- ss << *(*ii)->diff();
- m_ccnxHandle->publishData(interest, ss.str(), m_syncResponseFreshness);
- }
- else
- {
- m_thread.join();
- m_thread = thread(&SyncLogic::checkAgain, this, interest, digest);
- }
-}
-
-void
SyncLogic::sendSyncInterest ()
{
ostringstream os;
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 03d80b9..b624ecb 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -24,6 +24,8 @@
#define SYNC_LOGIC_H
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
+#include "boost/date_time/posix_time/posix_time_types.hpp"
+
#include "sync-ccnx-wrapper.h"
#include "sync-interest-table.h"
#include "sync-diff-state.h"
@@ -73,11 +75,27 @@
*/
void processSyncData (const std::string &name, const std::string &dataBuffer);
+#ifdef _DEBUG
+ size_t
+ getListChecksSize ()
+ {
+ boost::lock_guard<boost::mutex> lock (m_listChecksMutex);
+ return m_listChecks.size ();
+ }
+#endif
+
private:
+ void delayedChecksLoop ();
+
+ void
+ processSyncInterest (DigestConstPtr digest, const std::string &interestname, bool timedProcessing=false);
+
void sendSyncInterest ();
- void checkAgain (const std::string &interest, DigestPtr digest);
+ // void checkAgain (const std::string &interest, DigestPtr digest);
private:
+ typedef std::list< boost::tuple< boost::system_time, boost::function< void ( ) > > > DelayedChecksList;
+
FullState m_state;
DiffStateContainer m_log;
SyncInterestTable m_syncInterestTable;
@@ -86,8 +104,13 @@
LogicCallback m_fetch;
CcnxWrapperPtr m_ccnxHandle;
- boost::thread m_thread;
+ boost::thread m_delayedCheckThread;
+ bool m_delayedCheckThreadRunning;
+ DelayedChecksList m_listChecks;
+ boost::condition_variable m_listChecksCondition;
+ boost::mutex m_listChecksMutex;
+ static const boost::posix_time::time_duration m_delayedCheckTime;
static const int m_syncResponseFreshness = 2;
};