Solving some deadlocks and crashes
It is still some mystery with two clients and delays
diff --git a/model/sync-app-data-publish.cc b/model/sync-app-data-publish.cc
index faaf482..adef5e3 100644
--- a/model/sync-app-data-publish.cc
+++ b/model/sync-app-data-publish.cc
@@ -44,29 +44,22 @@
uint32_t
AppDataPublish::getNextSeq (const string &prefix, uint32_t session)
{
- unordered_map<string, Seq>::iterator i = m_sequenceLog.find(prefix);
+ SequenceLog::iterator i = m_sequenceLog.find (prefix);
- if (i != m_sequenceLog.end())
+ if (i != m_sequenceLog.end ())
{
Seq s = i->second;
if (s.session == session)
return s.seq;
}
- else
- BOOST_THROW_EXCEPTION(GetSeqException() << errmsg_info_str("No corresponding seq"));
+ else
+ return 0;
}
-bool
+uint32_t
AppDataPublish::publishData (const string &name, uint32_t session, const string &dataBuffer, int freshness)
{
- uint32_t seq = 0;
- try
- {
- seq = getNextSeq(name, session);
- }
- catch (GetSeqException &e){
- m_sequenceLog.erase(name);
- }
+ uint32_t seq = getNextSeq (name, session);
ostringstream contentNameWithSeqno;
contentNameWithSeqno << name << "/" << session << "/" << seq;
@@ -78,12 +71,12 @@
s.seq = seq + 1;
m_sequenceLog[name] = s;
- unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
- if (it != m_recentData.end())
- m_recentData.erase(it);
- m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
+ // unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
+ // if (it != m_recentData.end())
+ // m_recentData.erase(it);
+ // m_recentData.insert(make_pair(make_pair(name, session), dataBuffer));
- return true;
+ return seq;
}
-}
+} // Sync
diff --git a/model/sync-app-data-publish.h b/model/sync-app-data-publish.h
index 85bb9ae..aa05d6c 100644
--- a/model/sync-app-data-publish.h
+++ b/model/sync-app-data-publish.h
@@ -23,10 +23,11 @@
#ifndef SYNC_APP_DATA_PUBLISH_H
#define SYNC_APP_DATA_PUBLISH_H
-#include <boost/unordered_map.hpp>
+// #include <boost/unordered_map.hpp>
#include "sync-seq-no.h"
#include "sync-ccnx-wrapper.h"
#include <utility>
+#include <map>
namespace Sync {
@@ -78,14 +79,17 @@
* @param session session to which data is published
* @param dataBuffer the data itself
* @param freshness the freshness for the data object
- * @return whether the publish succeeded
+ * @return published sequence number, will throw an exception if something wrong happened
*/
- bool publishData (const std::string &name, uint32_t session, const std::string &dataBuffer, int freshness);
+ uint32_t publishData (const std::string &name, uint32_t session, const std::string &dataBuffer, int freshness);
private:
- boost::unordered_map<std::string, Seq> m_sequenceLog;
+ typedef std::map<std::string, Seq> SequenceLog;
+ typedef std::map<std::pair<std::string, uint32_t>, std::string> RecentData;
+
CcnxWrapperPtr m_ccnxHandle;
- boost::unordered_map<std::pair<std::string, uint32_t>, std::string> m_recentData;
+ SequenceLog m_sequenceLog;
+ RecentData m_recentData;
};
} // Sync
diff --git a/model/sync-app-socket.cc b/model/sync-app-socket.cc
index 2f1d174..42d34e4 100644
--- a/model/sync-app-socket.cc
+++ b/model/sync-app-socket.cc
@@ -44,8 +44,8 @@
bool SyncAppSocket::publish (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
{
- m_publisher.publishData (prefix, session, dataBuffer, freshness);
- m_syncLogic.addLocalNames (prefix, session, m_publisher.getNextSeq (prefix, session));
+ uint32_t sequence = m_publisher.publishData (prefix, session, dataBuffer, freshness);
+ m_syncLogic.addLocalNames (prefix, session, sequence);
}
}
diff --git a/model/sync-app-socket.h b/model/sync-app-socket.h
index e6f1df1..4f7ec33 100644
--- a/model/sync-app-socket.h
+++ b/model/sync-app-socket.h
@@ -71,7 +71,7 @@
void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
private:
- CcnxWrapperPtr m_appHandle;
+ CcnxWrapperPtr m_appHandle;
AppDataFetch m_fetcher;
AppDataPublish m_publisher;
diff --git a/model/sync-ccnx-wrapper.cc b/model/sync-ccnx-wrapper.cc
index e911e2e..50da304 100644
--- a/model/sync-ccnx-wrapper.cc
+++ b/model/sync-ccnx-wrapper.cc
@@ -37,7 +37,7 @@
, m_keyLoactor (0)
, m_running (true)
{
- m_handle = ccn_create();
+ m_handle = ccn_create ();
if (ccn_connect(m_handle, NULL) < 0)
BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
initKeyStore();
@@ -47,8 +47,13 @@
CcnxWrapper::~CcnxWrapper()
{
- m_running = false;
- m_thread.join();
+ // std::cout << "CcnxWrapper::~CcnxWrapper()" << std::endl;
+ {
+ recursive_mutex::scoped_lock lock(m_mutex);
+ m_running = false;
+ }
+
+ m_thread.join ();
ccn_disconnect (m_handle);
ccn_destroy (&m_handle);
ccn_charbuf_destroy (&m_keyLoactor);
@@ -112,11 +117,15 @@
if (res >= 0)
{
int ret = poll(pfds, 1, 100);
- if (ret >= 0)
+ if (ret < 0)
{
- recursive_mutex::scoped_lock lock(m_mutex);
- res = ccn_run(m_handle, 0);
+ BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
}
+
+ recursive_mutex::scoped_lock lock(m_mutex);
+ if (!m_running) break;
+
+ res = ccn_run(m_handle, 0);
}
}
}
@@ -165,6 +174,7 @@
switch (kind)
{
case CCN_UPCALL_FINAL: // effective in unit tests
+ cout << "FINAL??" << endl;
delete f;
delete selfp;
return CCN_UPCALL_RESULT_OK;
@@ -233,6 +243,7 @@
int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback)
{
+ // std::cout << "Send interests for " << strInterest << std::endl;
ccn_charbuf *pname = ccn_charbuf_create();
ccn_closure *dataClosure = new ccn_closure;
@@ -264,4 +275,20 @@
ccn_charbuf_destroy(&pname);
}
+void
+CcnxWrapper::clearInterestFilter (const std::string &prefix)
+{
+ std::cout << "clearInterestFilter" << std::endl;
+ ccn_charbuf *pname = ccn_charbuf_create();
+
+ ccn_name_from_uri (pname, prefix.c_str());
+ int ret = ccn_set_interest_filter (m_handle, pname, 0);
+ if (ret < 0)
+ {
+ BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("set interest filter failed") << errmsg_info_int (ret));
+ }
+
+ ccn_charbuf_destroy(&pname);
+}
+
}
diff --git a/model/sync-ccnx-wrapper.h b/model/sync-ccnx-wrapper.h
index fc7afb1..ecfeee3 100644
--- a/model/sync-ccnx-wrapper.h
+++ b/model/sync-ccnx-wrapper.h
@@ -75,7 +75,7 @@
sendInterest (const std::string &strInterest, const DataCallback &dataCallback);
/**
- * @brief set Interest filter (specify what interest you want to receive
+ * @brief set Interest filter (specify what interest you want to receive)
*
* @param prefix the prefix of Interest
* @param interestCallback the callback function to deal with the returned data
@@ -85,6 +85,13 @@
setInterestFilter (const std::string &prefix, const InterestCallback &interestCallback);
/**
+ * @brief clear Interest filter
+ * @param prefix the prefix of Interest
+ */
+ void
+ clearInterestFilter (const std::string &prefix);
+
+ /**
* @brief publish data and put it to local ccn content store; need to grab
* lock m_mutex first
*
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index a954db8..c81f2f1 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -45,24 +45,26 @@
, m_randomGenerator (static_cast<unsigned int> (std::time (0)))
, m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,100))
{
- m_ccnxHandle->setInterestFilter (syncPrefix,
+ m_ccnxHandle->setInterestFilter (m_syncPrefix,
bind (&SyncLogic::respondSyncInterest, this, _1));
- m_scheduler.schedule (posix_time::seconds (4),
- bind (&SyncLogic::sendSyncInterest, this),
- REEXPRESSING_INTEREST);
+ sendSyncInterest ();
}
SyncLogic::~SyncLogic ()
{
-}
+ // cout << "SyncLogic::~SyncLogic ()" << endl;
+ m_ccnxHandle.reset ();
+}
void
SyncLogic::respondSyncInterest (const string &interest)
{
- //cout << "Respond Sync Interest" << endl;
+ //cout << "Respond Sync Interest" << endl;
string hash = interest.substr(interest.find_last_of("/") + 1);
+ // cout << "Received Sync Interest: " << hash << endl;
+
DigestPtr digest = make_shared<Digest> ();
try
{
@@ -81,7 +83,7 @@
void
SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
{
- //cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+ // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
if (*m_state.getDigest() == *digest)
@@ -129,7 +131,7 @@
void
SyncLogic::processSyncData (const string &name, const string &dataBuffer)
{
- //cout << "Process Sync Data" <<endl;
+ // cout << "Process Sync Data" << endl;
DiffStatePtr diffLog = make_shared<DiffState> ();
try
@@ -232,20 +234,8 @@
}
void
-SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog)
+SyncLogic::satisfyPendingSyncInterests (DiffStatePtr diffLog)
{
- //cout << "Process Pending Interests" <<endl;
- recursive_mutex::scoped_lock lock (m_stateMutex);
-
- diffLog->setDigest (m_state.getDigest());
- if (m_log.size () > 0)
- {
- m_log.get<sequenced> ().front ()->setNext (diffLog);
- }
- m_log.erase (m_state.getDigest()); // remove diff state with the same digest. next pointers are still valid
- /// @todo Optimization
- m_log.insert (diffLog);
-
vector<string> pis = m_syncInterestTable.fetchAll ();
if (pis.size () > 0)
{
@@ -259,31 +249,55 @@
}
void
+SyncLogic::processPendingSyncInterests (DiffStatePtr diffLog)
+{
+ //cout << "Process Pending Interests" <<endl;
+ diffLog->setDigest (m_state.getDigest());
+ if (m_log.size () > 0)
+ {
+ m_log.get<sequenced> ().front ()->setNext (diffLog);
+ }
+ m_log.erase (m_state.getDigest()); // remove diff state with the same digest. next pointers are still valid
+ /// @todo Optimization
+ m_log.insert (diffLog);
+}
+
+void
SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
{
- //cout << "Add local names" <<endl;
- recursive_mutex::scoped_lock lock (m_stateMutex);
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+ DiffStatePtr diff;
+ {
+ //cout << "Add local names" <<endl;
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+ NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
- SeqNo seqN (session, seq);
- m_state.update(info, seqN);
+ SeqNo seqN (session, seq);
+ m_state.update(info, seqN);
- DiffStatePtr diff = make_shared<DiffState>();
- diff->update(info, seqN);
- processPendingSyncInterests (diff);
+ diff = make_shared<DiffState>();
+ diff->update(info, seqN);
+ processPendingSyncInterests (diff);
+ }
+
+ satisfyPendingSyncInterests (diff);
}
void
SyncLogic::remove(const string &prefix)
{
- recursive_mutex::scoped_lock lock (m_stateMutex);
- NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
- m_state.remove(info);
+ DiffStatePtr diff;
+ {
+ recursive_mutex::scoped_lock lock (m_stateMutex);
+ NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
+ m_state.remove(info);
- DiffStatePtr diff = make_shared<DiffState>();
- diff->remove(info);
+ diff = make_shared<DiffState>();
+ diff->remove(info);
- processPendingSyncInterests (diff);
+ processPendingSyncInterests (diff);
+ }
+
+ satisfyPendingSyncInterests (diff);
}
void
diff --git a/model/sync-logic.h b/model/sync-logic.h
index b5a4174..871ee33 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -104,7 +104,10 @@
sendSyncInterest ();
void
- processPendingSyncInterests(DiffStatePtr &diff);
+ processPendingSyncInterests (DiffStatePtr diff);
+
+ void
+ satisfyPendingSyncInterests (DiffStatePtr diff);
private:
FullState m_state;