Merge remote-tracking branch 'git.irl/master'
All cout's are commented out
diff --git a/model/sync-app-data-publish.cc b/model/sync-app-data-publish.cc
index 17d52ee..faaf482 100644
--- a/model/sync-app-data-publish.cc
+++ b/model/sync-app-data-publish.cc
@@ -21,6 +21,9 @@
*/
#include "sync-app-data-publish.h"
+#include <boost/throw_exception.hpp>
+typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
+typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
using namespace std;
using namespace boost;
@@ -28,6 +31,7 @@
namespace Sync
{
+
string
AppDataPublish::getRecentData (const string &prefix, uint32_t session)
{
@@ -38,7 +42,7 @@
}
uint32_t
-AppDataPublish::getHighestSeq (const string &prefix, uint32_t session)
+AppDataPublish::getNextSeq (const string &prefix, uint32_t session)
{
unordered_map<string, Seq>::iterator i = m_sequenceLog.find(prefix);
@@ -48,30 +52,32 @@
if (s.session == session)
return s.seq;
}
-
- return 0;
+ else
+ BOOST_THROW_EXCEPTION(GetSeqException() << errmsg_info_str("No corresponding seq"));
}
bool
AppDataPublish::publishData (const string &name, uint32_t session, const string &dataBuffer, int freshness)
{
- uint32_t seq = getHighestSeq(name, session);
- if (seq == 0)
+ uint32_t seq = 0;
+ try
+ {
+ seq = getNextSeq(name, session);
+ }
+ catch (GetSeqException &e){
m_sequenceLog.erase(name);
-
- seq++;
- if (seq == 0)
- seq = 1;
- Seq s;
- s.session = session;
- s.seq = seq;
- m_sequenceLog[name] = s;
+ }
ostringstream contentNameWithSeqno;
contentNameWithSeqno << name << "/" << session << "/" << seq;
m_ccnxHandle->publishData (contentNameWithSeqno.str (), dataBuffer, freshness);
+ Seq s;
+ s.session = session;
+ s.seq = seq + 1;
+ m_sequenceLog[name] = s;
+
unordered_map<pair<string, uint32_t>, string>::iterator it = m_recentData.find(make_pair(name, session));
if (it != m_recentData.end())
m_recentData.erase(it);
diff --git a/model/sync-app-data-publish.h b/model/sync-app-data-publish.h
index d072189..85bb9ae 100644
--- a/model/sync-app-data-publish.h
+++ b/model/sync-app-data-publish.h
@@ -36,6 +36,8 @@
uint32_t seq;
};
+struct GetSeqException : virtual boost::exception, virtual std::exception { };
+
/**
* \ingroup sync
* @brief publishes application data using incrementing sequence number (for
@@ -66,7 +68,7 @@
* @param session session
*/
uint32_t
- getHighestSeq (const std::string &prefix, uint32_t session);
+ getNextSeq (const std::string &prefix, uint32_t session);
/**
* @brief publish data for a name prefix, updates the corresponding
diff --git a/model/sync-app-socket.cc b/model/sync-app-socket.cc
index 3dca250..2f1d174 100644
--- a/model/sync-app-socket.cc
+++ b/model/sync-app-socket.cc
@@ -29,13 +29,12 @@
{
SyncAppSocket::SyncAppSocket (const string &syncPrefix, CcnxWrapper::DataCallback dataCallback)
- : m_ccnxHandle (new CcnxWrapper())
- , m_fetcher (m_ccnxHandle, dataCallback)
- , m_publisher (m_ccnxHandle)
+ : m_appHandle (new CcnxWrapper())
+ , m_fetcher (m_appHandle, dataCallback)
+ , m_publisher (m_appHandle)
, m_syncLogic (syncPrefix,
bind (&AppDataFetch::onUpdate, m_fetcher, _1, _2, _3),
- bind (&AppDataFetch::onRemove, m_fetcher, _1),
- m_ccnxHandle)
+ bind (&AppDataFetch::onRemove, m_fetcher, _1))
{
}
@@ -46,7 +45,7 @@
bool SyncAppSocket::publish (const string &prefix, uint32_t session, const string &dataBuffer, int freshness)
{
m_publisher.publishData (prefix, session, dataBuffer, freshness);
- m_syncLogic.addLocalNames (prefix, session, m_publisher.getHighestSeq (prefix, session));
+ m_syncLogic.addLocalNames (prefix, session, m_publisher.getNextSeq (prefix, session));
}
}
diff --git a/model/sync-app-socket.h b/model/sync-app-socket.h
index dfa29db..7c01cf2 100644
--- a/model/sync-app-socket.h
+++ b/model/sync-app-socket.h
@@ -70,8 +70,13 @@
*/
void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
+ /**
+ * @brief start sync process
+ */
+ void start() {m_syncLogic.start();}
+
private:
- CcnxWrapperPtr m_ccnxHandle;
+ CcnxWrapperPtr m_appHandle;
AppDataFetch m_fetcher;
AppDataPublish m_publisher;
diff --git a/model/sync-logic.cc b/model/sync-logic.cc
index 4a243a4..c8452dc 100644
--- a/model/sync-logic.cc
+++ b/model/sync-logic.cc
@@ -37,12 +37,11 @@
SyncLogic::SyncLogic (const std::string &syncPrefix,
LogicUpdateCallback onUpdate,
- LogicRemoveCallback onRemove,
- CcnxWrapperPtr ccnxHandle)
+ LogicRemoveCallback onRemove)
: m_syncPrefix (syncPrefix)
, m_onUpdate (onUpdate)
, m_onRemove (onRemove)
- , m_ccnxHandle (ccnxHandle)
+ , m_ccnxHandle(new CcnxWrapper())
, m_randomGenerator (static_cast<unsigned int> (std::time (0)))
, m_rangeUniformRandom (m_randomGenerator, uniform_int<> (20,100))
{
@@ -63,6 +62,7 @@
void
SyncLogic::respondSyncInterest (const string &interest)
{
+ // cout << "Respond Sync Interest" << endl;
string hash = interest.substr(interest.find_last_of("/") + 1);
DigestPtr digest = make_shared<Digest> ();
try
@@ -82,7 +82,7 @@
void
SyncLogic::processSyncInterest (DigestConstPtr digest, const std::string &interestName, bool timedProcessing/*=false*/)
{
- // cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
+ //cout << "SyncLogic::processSyncInterest " << timedProcessing << endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
if (*m_state.getDigest() == *digest)
@@ -119,6 +119,7 @@
void
SyncLogic::processSyncData (const string &name, const string &dataBuffer)
{
+ // cout << "Process Sync Data" <<endl;
DiffStatePtr diffLog = make_shared<DiffState> ();
try
@@ -211,6 +212,7 @@
void
SyncLogic::processPendingSyncInterests (DiffStatePtr &diffLog)
{
+ //cout << "Process Pending Interests" <<endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
diffLog->setDigest(m_state.getDigest());
@@ -223,18 +225,19 @@
vector<string> pis = m_syncInterestTable.fetchAll ();
if (pis.size () > 0)
{
- stringstream ss;
+ stringstream ss;
ss << *diffLog;
- for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
- {
- m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
- }
- }
+ for (vector<string>::iterator ii = pis.begin(); ii != pis.end(); ++ii)
+ {
+ m_ccnxHandle->publishData (*ii, ss.str(), m_syncResponseFreshness);
+ }
+}
}
void
SyncLogic::addLocalNames (const string &prefix, uint32_t session, uint32_t seq)
{
+ //cout << "Add local names" <<endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
NameInfoConstPtr info = StdNameInfo::FindOrCreate(prefix);
SeqNo seqN(session, seq);
@@ -262,6 +265,7 @@
void
SyncLogic::sendSyncInterest ()
{
+ // cout << "Sending Sync Interest" << endl;
recursive_mutex::scoped_lock lock (m_stateMutex);
ostringstream os;
diff --git a/model/sync-logic.h b/model/sync-logic.h
index 03c129a..da0a7fa 100644
--- a/model/sync-logic.h
+++ b/model/sync-logic.h
@@ -26,6 +26,7 @@
#include <boost/shared_ptr.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/random.hpp>
+#include <memory>
#include "sync-ccnx-wrapper.h"
#include "sync-interest-table.h"
@@ -59,8 +60,7 @@
*/
SyncLogic (const std::string &syncPrefix,
LogicUpdateCallback onUpdate,
- LogicRemoveCallback onRemove,
- CcnxWrapperPtr ccnxHandle);
+ LogicRemoveCallback onRemove);
~SyncLogic ();
@@ -88,6 +88,11 @@
*/
void remove (const std::string &prefix);
+ /**
+ * @brief start the sync process
+ */
+ void start();
+
#ifdef _DEBUG
Scheduler &
getScheduler () { return m_scheduler; }
@@ -116,7 +121,7 @@
std::string m_syncPrefix;
LogicUpdateCallback m_onUpdate;
LogicRemoveCallback m_onRemove;
- CcnxWrapperPtr m_ccnxHandle;
+ std::auto_ptr<CcnxWrapper> m_ccnxHandle;
Scheduler m_scheduler;
diff --git a/test/test_app_socket.cc b/test/test_app_socket.cc
new file mode 100644
index 0000000..016ef54
--- /dev/null
+++ b/test/test_app_socket.cc
@@ -0,0 +1,137 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2012 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * 卞超轶 Chaoyi Bian <bcy@pku.edu.cn>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+
+#include <boost/test/unit_test.hpp>
+#include <boost/test/output_test_stream.hpp>
+using boost::test_tools::output_test_stream;
+
+#include <boost/make_shared.hpp>
+
+#include "../model/sync-app-socket.h"
+extern "C" {
+#include <unistd.h>
+}
+
+using namespace Sync;
+using namespace std;
+using namespace boost;
+
+class TestSocketApp {
+public:
+ map<string, string> data;
+ void set(string str1, string str2) {
+ data.insert(make_pair(str1, str2));
+ }
+
+ string toString(){
+ map<string, string>::iterator it = data.begin();
+ string str = "\n";
+ for (; it != data.end(); ++it){
+ str += "<";
+ str += it->first;
+ str += "|";
+ str += it->second;
+ str += ">";
+ str += "\n";
+ }
+ return str;
+ }
+
+};
+
+BOOST_AUTO_TEST_CASE (AppSocketTest)
+{
+
+ TestSocketApp a1, a2, a3;
+ boost::function<void(string, string)> f1 = bind(&TestSocketApp::set, &a1, _1,
+ _2);
+ boost::function<void(string, string)> f2 = bind(&TestSocketApp::set, &a2, _1,
+ _2);
+ boost::function<void(string, string)> f3 = bind(&TestSocketApp::set, &a3, _1,
+ _2);
+
+ string syncPrefix("/let/us/sync");
+ string p1("/irl.cs.ucla.edu"), p2("/yakshi.org"), p3("/google.com");
+
+ SyncAppSocket s1(syncPrefix, f1), s2(syncPrefix, f2), s3(syncPrefix, f3);
+
+ s1.start();
+ s2.start();
+ s3.start();
+
+ // single source
+ string data0 = "Very funny Scotty, now beam down my clothes";
+ s1.publish(p1, 0, data0 , 10);
+ usleep(10000);
+
+ // from code logic, we won't be fetching our own data
+ a1.set(p1 + "/0/0", data0);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+
+ // single source, multiple data at once
+ string data1 = "Yes, give me that ketchup";
+ string data2 = "Don't look conspicuous, it draws fire";
+ s1.publish(p1, 0, data1, 10);
+ s1.publish(p1, 0, data2, 10);
+ usleep(10000);
+
+ // from code logic, we won't be fetching our own data
+ a1.set(p1 + "/0/1", data1);
+ a1.set(p1 + "/0/2", data2);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+
+ // another single source
+ string data3 = "You surf the Internet, I surf the real world";
+ string data4 = "I got a fortune cookie once that said 'You like Chinese food'";
+ string data5 = "Real men wear pink. Why? Because their wives make them";
+ s3.publish(p3, 0, data3, 10);
+ usleep(10000);
+ // another single source, multiple data at once
+ s2.publish(p2, 0, data4, 10);
+ s2.publish(p2, 0, data5, 10);
+ usleep(10000);
+
+ // from code logic, we won't be fetching our own data
+ a3.set(p3 + "/0/0", data3);
+ a2.set(p2 + "/0/0", data4);
+ a2.set(p2 + "/0/1", data5);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+
+ // not sure weither this is simultanous data generation from multiple sources
+ string data6 = "Shakespeare says: 'Prose before hos.'";
+ string data7 = "Pick good people, talent never wears out";
+ s1.publish(p1, 0, data6, 10);
+ s2.publish(p2, 0, data7, 10);
+ usleep(10000);
+
+ // from code logic, we won't be fetching our own data
+ a1.set(p1 + "/0/3", data6);
+ a2.set(p2 + "/0/2", data7);
+ BOOST_CHECK_EQUAL(a1.toString(), a2.toString());
+ BOOST_CHECK_EQUAL(a2.toString(), a3.toString());
+
+}
+
+
diff --git a/test/test_data_fetch_and_publish.cc b/test/test_data_fetch_and_publish.cc
index a442830..b1e7647 100644
--- a/test/test_data_fetch_and_publish.cc
+++ b/test/test_data_fetch_and_publish.cc
@@ -67,7 +67,7 @@
TestStructApp bar;
string interest = "/april/fool";
- string seq[5] = {"1", "2", "3", "4", "5"};
+ string seq[5] = {"0", "1", "2", "3", "4" };
string str[5] = {"panda", "express", "tastes", "so", "good"};
for (int i = 0; i < 5; i++) {
@@ -86,10 +86,10 @@
publisher.publishData(interest, 0, str[i - 1], 5);
}
- BOOST_CHECK_EQUAL(publisher.getHighestSeq(interest, 0), 5);
+ BOOST_CHECK_EQUAL(publisher.getNextSeq(interest, 0), 5);
BOOST_CHECK_EQUAL(publisher.getRecentData(interest, 0), str[4]);
- fetcher.onUpdate (interest, SeqNo (0,5), SeqNo (0,0));
+ fetcher.onUpdate (interest, SeqNo (0,4), SeqNo (0,-1));
// give time for ccnd to react
sleep(1);
BOOST_CHECK_EQUAL(foo.toString(), bar.toString());
@@ -99,7 +99,7 @@
bind(&TestStructApp::erase, &bar, _1, _2);
fetcher.setDataCallback(eraseFunc);
- fetcher.onUpdate (interest, SeqNo (0,5), SeqNo (0,0));
+ fetcher.onUpdate (interest, SeqNo (0,4), SeqNo (0,-1));
// give time for ccnd to react
sleep(1);
TestStructApp poo;
diff --git a/test/test_scheduler.cc b/test/test_scheduler.cc
index 1dce279..821155c 100644
--- a/test/test_scheduler.cc
+++ b/test/test_scheduler.cc
@@ -144,7 +144,7 @@
BOOST_AUTO_TEST_CASE (SyncLogicSchedulerTest)
{
SyncLogic *logic = 0;
- BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove, make_shared<CcnxWrapper> ()));
+ BOOST_CHECK_NO_THROW (logic = new SyncLogic ("/prefix", funcUpdate, funcRemove));
Scheduler &scheduler = logic->getScheduler ();
BOOST_CHECK_EQUAL (scheduler.getEventsSize (), 0);