"Small" reorganization and cleaning
diff --git a/ccnx/ccnx-common.h b/ccnx/ccnx-common.h
index 41d7814..985965c 100644
--- a/ccnx/ccnx-common.h
+++ b/ccnx/ccnx-common.h
@@ -86,6 +86,16 @@
return BytesPtr ();
}
+template<class Msg>
+inline BytesPtr
+serializeMsg(const Msg &msg)
+{
+ int size = msg->ByteSize ();
+ BytesPtr bytes (new Bytes (size));
+ msg->SerializeToArray (head(*bytes), size);
+ return bytes;
+}
+
// --- Bytes operations end ---
// Exceptions
diff --git a/ccnx/ccnx-name.cpp b/ccnx/ccnx-name.cpp
index 4573ed1..848019a 100644
--- a/ccnx/ccnx-name.cpp
+++ b/ccnx/ccnx-name.cpp
@@ -1,3 +1,24 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ */
+
#include "ccnx-name.h"
#include <boost/lexical_cast.hpp>
#include <ctype.h>
diff --git a/ccnx/ccnx-name.h b/ccnx/ccnx-name.h
index a07f854..92c3054 100644
--- a/ccnx/ccnx-name.h
+++ b/ccnx/ccnx-name.h
@@ -105,6 +105,9 @@
Name &
operator ()(const T &comp) { return appendComp (comp); }
+ Name &
+ operator ()(const void *buf, size_t size) { return appendComp (buf, size); }
+
int
size() const {return m_comps.size();}
diff --git a/log4cxx.properties b/log4cxx.properties
index f41c718..89f14d6 100644
--- a/log4cxx.properties
+++ b/log4cxx.properties
@@ -9,9 +9,10 @@
log4j.appender.A1.target=System.err
#log4j.appender.A1.layout.ConversionPattern=%d{dd-MMM HH:MM:SS,SSS} %p %c %m%n
#log4j.appender.A1.layout.ConversionPattern=%d{hh:mm:ss,SSS} %-14t %-14c %m%n
-log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS} %-12c %m%n
+log4j.appender.A1.layout.ConversionPattern=%d{ss,SSS} %-5p %-12c %m%n
-log4j.logger.Sync = TRACE
+log4j.logger.Sync = DEBUG
+log4j.logger.Sync.Log = ERROR
#log4j.logger.SyncInterestTable = TRACE
#log4j.logger.AppDataFetch = TRACE
log4j.logger.Test = TRACE
diff --git a/src/action-log.cc b/src/action-log.cc
index 24114a0..0111fbb 100644
--- a/src/action-log.cc
+++ b/src/action-log.cc
@@ -61,9 +61,9 @@
sqlite3_int64 parent_device_id = -1;
sqlite3_int64 parent_seq_no = -1;
string parent_device_name;
-
+
sqlite3_bind_text (stmt, 1, filename.c_str (), filename.size (), SQLITE_STATIC);
- if (sqlite3_step (stmt) == SQLITE_ROW)
+ if (sqlite3_step (stmt) == SQLITE_ROW)
{
version = sqlite3_column_int64 (stmt, 0) + 1;
@@ -74,7 +74,7 @@
parent_device_name = string(reinterpret_cast<const char*> (sqlite3_column_blob (stmt, 4)), sqlite3_column_bytes (stmt, 4));
}
}
-
+
sqlite3_finalize (stmt);
return make_tuple (version, parent_device_id, parent_seq_no, parent_device_name);
}
@@ -88,7 +88,7 @@
int seg_num)
{
sqlite3_exec (m_db, "BEGIN TRANSACTION;", 0,0,0);
-
+
sqlite3_int64 seq_no = GetNextLocalSeqNo ();
sqlite3_int64 version = 0;
sqlite3_int64 parent_device_id = -1;
@@ -96,9 +96,9 @@
sqlite3_int64 parent_seq_no = -1;
sqlite3_int64 action_time = time (0);
-
+
tie (version, parent_device_id, parent_seq_no, parent_device_name) = GetExistingRecord (filename);
-
+
sqlite3_stmt *stmt;
int res = sqlite3_prepare_v2 (m_db, "INSERT INTO ActionLog "
"(device_id, seq_no, action, filename, version, action_timestamp, "
@@ -117,7 +117,7 @@
// "VALUES (?, ?, ?, ?, ?, datetime(?, 'unixepoch'),"
// " ?, datetime(?, 'unixepoch'), datetime(?, 'unixepoch'), datetime(?, 'unixepoch'), ?,"
// " ?, ?)" << endl;
-
+
if (res != SQLITE_OK)
{
BOOST_THROW_EXCEPTION (Error::Db ()
@@ -126,16 +126,16 @@
// << errmsg_info_str ("Some error with prepare AddActionUpdate"));
}
-
+
sqlite3_bind_int64 (stmt, 1, m_localDeviceId);
sqlite3_bind_int64 (stmt, 2, seq_no);
sqlite3_bind_int (stmt, 3, 0);
sqlite3_bind_text (stmt, 4, filename.c_str (), filename.size (), SQLITE_TRANSIENT);
sqlite3_bind_int64 (stmt, 5, version);
sqlite3_bind_int64 (stmt, 6, action_time);
-
+
sqlite3_bind_blob (stmt, 7, hash.GetHash (), hash.GetHashBytes (), SQLITE_TRANSIENT);
-
+
// sqlite3_bind_int64 (stmt, 8, atime); // NULL
sqlite3_bind_int64 (stmt, 9, wtime);
// sqlite3_bind_int64 (stmt, 10, ctime); // NULL
@@ -152,7 +152,7 @@
sqlite3_bind_null (stmt, 13);
sqlite3_bind_null (stmt, 14);
}
-
+
// missing part: creating ContentObject for the action !!!
ActionItem item;
@@ -171,7 +171,7 @@
{
cout << Name (reinterpret_cast<const unsigned char *> (parent_device_name.c_str ()),
parent_device_name.size ()) << endl;
-
+
item.set_parent_device_name (parent_device_name);
item.set_parent_seq_no (parent_seq_no);
}
@@ -180,22 +180,18 @@
string item_msg;
item.SerializeToString (&item_msg);
- Name actionName (m_localName);
- actionName
- .appendComp ("action")
- .appendComp (m_sharedFolderName)
- .appendComp (seq_no);
-
+ Name actionName = Name (m_localName)("action")(m_sharedFolderName)(seq_no);
+
Bytes actionData = m_ccnx->createContentObject (actionName, item_msg.c_str (), item_msg.size ());
CcnxCharbufPtr namePtr = actionName.toCcnxCharbuf ();
-
+
sqlite3_bind_blob (stmt, 14, namePtr->buf (), namePtr->length (), SQLITE_TRANSIENT);
sqlite3_bind_blob (stmt, 15, &actionData[0], actionData.size (), SQLITE_TRANSIENT);
-
+
sqlite3_step (stmt);
- sqlite3_finalize (stmt);
-
+ sqlite3_finalize (stmt);
+
sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
}
@@ -211,18 +207,18 @@
ActionLog::AddActionDelete (const std::string &filename)
{
sqlite3_exec (m_db, "BEGIN TRANSACTION;", 0,0,0);
-
+
sqlite3_int64 version = 0;
sqlite3_int64 parent_device_id = -1;
string parent_device_name;
sqlite3_int64 parent_seq_no = -1;
sqlite3_int64 action_time = time (0);
-
+
tie (version, parent_device_id, parent_seq_no, parent_device_name) = GetExistingRecord (filename);
if (parent_device_id < 0) // no records exist or file was already deleted
{
- sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
+ sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
return;
}
@@ -258,28 +254,24 @@
string item_msg;
item.SerializeToString (&item_msg);
- Name actionName (m_localName);
- actionName
- .appendComp ("action")
- .appendComp (m_sharedFolderName)
- .appendComp (seq_no);
-
+ Name actionName = Name (m_localName)("action")(m_sharedFolderName)(seq_no);
+
Bytes actionData = m_ccnx->createContentObject (actionName, item_msg.c_str (), item_msg.size ());
CcnxCharbufPtr namePtr = actionName.toCcnxCharbuf ();
-
+
sqlite3_bind_blob (stmt, 9, namePtr->buf (), namePtr->length (), SQLITE_TRANSIENT);
sqlite3_bind_blob (stmt, 10, &actionData[0], actionData.size (), SQLITE_TRANSIENT);
-
+
sqlite3_step (stmt);
-
+
// cout << Ccnx::Name (reinterpret_cast<const unsigned char *> (parent_device_name.c_str ()),
// parent_device_name.size ()) << endl;
-
+
// assign name to the action, serialize action, and create content object
-
- sqlite3_finalize (stmt);
-
- sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
+
+ sqlite3_finalize (stmt);
+
+ sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
}
@@ -299,7 +291,7 @@
// cout << "device_name: " << sqlite3_value_text (argv[0]) << endl;
// cout << "action: " << sqlite3_value_int (argv[1]) << endl;
// cout << "filename: " << sqlite3_value_text (argv[2]) << endl;
-
+
string device_name (reinterpret_cast<const char*> (sqlite3_value_blob (argv[0])), sqlite3_value_bytes (argv[0]));
sqlite3_int64 device_id = sqlite3_value_int64 (argv[1]);
sqlite3_int64 seq_no = sqlite3_value_int64 (argv[2]);
@@ -338,11 +330,11 @@
sqlite3_bind_int (stmt, 7, mode);
sqlite3_bind_int (stmt, 8, seg_num);
sqlite3_bind_text (stmt, 9, filename.c_str (), -1, SQLITE_TRANSIENT);
-
+
sqlite3_step (stmt);
// cout << sqlite3_errmsg (the->m_db) << endl;
-
+
sqlite3_finalize (stmt);
int affected_rows = sqlite3_changes (the->m_db);
@@ -362,7 +354,7 @@
sqlite3_bind_int64 (stmt, 6, mtime);
sqlite3_bind_int64 (stmt, 7, ctime);
sqlite3_bind_int (stmt, 8, mode);
-
+
sqlite3_step (stmt);
// cout << sqlite3_errmsg (the->m_db) << endl;
sqlite3_finalize (stmt);
@@ -375,10 +367,10 @@
sqlite3_bind_text (stmt, 1, filename.c_str (), -1, SQLITE_STATIC);
cout << "Delete " << filename << endl;
-
+
sqlite3_step (stmt);
sqlite3_finalize (stmt);
}
-
+
sqlite3_result_null (context);
}
diff --git a/src/logging.h b/src/logging.h
index 15784ff..849d2e8 100644
--- a/src/logging.h
+++ b/src/logging.h
@@ -46,10 +46,16 @@
#define _LOG_ERROR(x) \
LOG4CXX_ERROR(staticModuleLogger, x);
+#define _LOG_ERROR_COND(cond,x) \
+ if (cond) { _LOG_ERROR(x) }
+
+#define _LOG_DEBUG_COND(cond,x) \
+ if (cond) { _LOG_DEBUG(x) }
+
void
INIT_LOGGERS ();
-#else
+#else // else HAVE_LOG4CXX
#define INIT_LOGGER(name)
#define _LOG_FUNCTION(x)
@@ -57,6 +63,8 @@
#define _LOG_TRACE(x)
#define INIT_LOGGERS(x)
#define _LOG_ERROR(x)
+#define _LOG_ERROR_COND(cond,x)
+#define _LOG_DEBUG_COND(cond,x)
#ifdef _DEBUG
diff --git a/src/object-manager.cc b/src/object-manager.cc
index 7f66b2a..bb19842 100644
--- a/src/object-manager.cc
+++ b/src/object-manager.cc
@@ -55,7 +55,7 @@
{
HashPtr fileHash = Hash::FromFileContent (file);
ObjectDb fileDb (m_folder, lexical_cast<string> (*fileHash));
-
+
fs::ifstream iff (file, std::ios::in | std::ios::binary);
sqlite3_int64 segment = 0;
while (iff.good ())
@@ -63,21 +63,17 @@
char buf[MAX_FILE_SEGMENT_SIZE];
iff.read (buf, MAX_FILE_SEGMENT_SIZE);
- Name name (deviceName);
- name
- .appendComp ("file")
- .appendComp (fileHash->GetHash (), fileHash->GetHashBytes ())
- .appendComp (segment);
+ Name name = Name (deviceName)("file")(fileHash->GetHash (), fileHash->GetHashBytes ())(segment);
// cout << *fileHash << endl;
// cout << name << endl;
Bytes data = m_ccnx->createContentObject (name, buf, iff.gcount ());
fileDb.saveContentObject (deviceName, segment, data);
-
+
segment ++;
}
-
+
return fileHash;
}
@@ -103,12 +99,12 @@
BytesPtr data = obj.contentPtr ();
off.write (reinterpret_cast<const char*> (head(*data)), data->size());
-
+
segment ++;
bytes = fileDb.fetchSegment (deviceName, segment);
}
// permission and timestamp should be assigned somewhere else (ObjectManager has no idea about that)
-
+
return true;
}
diff --git a/src/sync-core.cc b/src/sync-core.cc
index cae3d6b..c0dc45b 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -20,6 +20,7 @@
*/
#include "sync-core.h"
+#include "sync-state-helper.h"
#include "logging.h"
#include "random-interval-generator.h"
@@ -32,51 +33,20 @@
const double SyncCore::RANDOM_PERCENT = 0.5;
using namespace boost;
+using namespace Ccnx;
-// for debugging
-static void
-printMsg(SyncStateMsgPtr &msg)
-{
- _LOG_TRACE (" ===== start Msg ======");
- int size = msg->state_size();
- if (size > 0)
- {
- int index = 0;
- while (index < size)
- {
- SyncState state = msg->state(index);
- string strName = state.name();
- string strLocator = state.locator();
- sqlite3_int64 seq = state.seq();
- _LOG_TRACE ("Name: " << Name((const unsigned char *)strName.c_str(), strName.size())
- <<", Locator: " << Name((const unsigned char *)strLocator.c_str(), strLocator.size())
- << ", seq: " << seq);
- index ++;
- }
- }
- else
- {
- _LOG_TRACE ("Msg size 0");
- }
- _LOG_TRACE (" ++++++++ end Msg ++++++++ ");
-}
-
-SyncCore::SyncCore(SyncLogPtr syncLog, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, const CcnxWrapperPtr &handle, const SchedulerPtr &scheduler)
- : m_log(syncLog)
- , m_scheduler(scheduler)
- , m_stateMsgCallback(callback)
- // , m_userName(userName)
- , m_syncPrefix(syncPrefix)
- , m_handle(handle)
- , m_syncClosure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
- boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1))
- , m_recoverClosure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
- boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1))
- , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
+SyncCore::SyncCore(SyncLogPtr syncLog, const Name &userName, const Name &localPrefix, const Name &syncPrefix,
+ const StateMsgCallback &callback, CcnxWrapperPtr ccnx, SchedulerPtr scheduler)
+ : m_ccnx (ccnx)
+ , m_log(syncLog)
+ , m_scheduler(scheduler)
+ , m_stateMsgCallback(callback)
+ , m_syncPrefix(syncPrefix)
+ , m_recoverWaitGenerator(new RandomIntervalGenerator(WAIT, RANDOM_PERCENT, RandomIntervalGenerator::UP))
{
m_rootHash = m_log->RememberStateInStateLog();
- m_handle->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
+ m_ccnx->setInterestFilter(m_syncPrefix, boost::bind(&SyncCore::handleInterest, this, _1));
// m_log->initYP(m_yp);
m_log->UpdateLocalLocator (localPrefix);
@@ -89,52 +59,23 @@
// need to "deregister" closures
}
-// Name
-// SyncCore::yp(const Name &deviceName)
-// {
-// Name locator;
-// ReadLock lock(m_ypMutex);
-// if (m_yp.find(deviceName) != m_yp.end())
-// {
-// locator = m_yp[deviceName];
-// }
-// else
-// {
-// cout << "self: " << m_userName << ", deviceName: " << deviceName << " not found in yp " << endl;
-// }
-// return locator;
-// }
-
-// void
-// SyncCore::updateLocalPrefix(const Name &localPrefix)
-// {
-// m_localPrefix = localPrefix;
-// // optionally, we can have a sync action to announce the new prefix
-// // we are not doing this for now
-// }
-
void
SyncCore::updateLocalState(sqlite3_int64 seqno)
{
m_log->UpdateLocalSeqNo (seqno);
- // choose to update locator everytime
- // m_log->UpdateLocator(m_userName, m_localPrefix);
- // {
- // WriteLock lock(m_ypMutex);
- // m_yp[m_userName] = m_localPrefix;
- // }
+
HashPtr oldHash = m_rootHash;
- m_rootHash = m_log->RememberStateInStateLog();
+ m_rootHash = m_log->RememberStateInStateLog ();
SyncStateMsgPtr msg = m_log->FindStateDifferences(*oldHash, *m_rootHash);
// reply sync Interest with oldHash as last component
- Name syncName = constructSyncName(oldHash);
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(syncName, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes: " << *oldHash);
- printMsg(msg);
+ Name syncName = Name (m_syncPrefix)(oldHash->GetHash(), oldHash->GetHashBytes());
+ BytesPtr syncData = serializeMsg (msg);
+
+ m_ccnx->publishData(syncName, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *oldHash);
+ _LOG_TRACE (msg);
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
// this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
@@ -172,35 +113,11 @@
{
// we know the hash, should reply everything
SyncStateMsgPtr msg = m_log->FindStateDifferences(*(Hash::Origin), *m_rootHash);
- // DEBUG
- /*
- assert(msg->state_size() > 0);
- int size = msg->state_size();
- int index = 0;
- cerr << "Reply recover interest with: " << endl;
- while (index < size)
- {
- SyncState state = msg->state(index);
- string strName = state.name();
- string strLoc = state.locator();
- cout << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size()) << ", Loc: " << Name((const unsigned char *)strLoc.c_str(), strLoc.size()) << ", seq: " << state.seq() << endl;
- if (state.type() == SyncState::UPDATE)
- {
- cout << "Action: update" << endl;
- }
- else
- {
- cout << "Action: delete" << endl;
- }
- index++;
- }
- */
- // END DEBUG
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(name, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes " << hash);
- printMsg(msg);
+
+ BytesPtr syncData = serializeMsg (msg);
+ m_ccnx->publishData(name, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes " << hash);
+ _LOG_TRACE (msg);
}
else
{
@@ -216,27 +133,26 @@
if (*hash == *m_rootHash)
{
// we have the same hash; nothing needs to be done
- _LOG_DEBUG ("same as root hash: " << *hash);
+ _LOG_TRACE ("same as root hash: " << *hash);
return;
}
else if (m_log->LookupSyncLog(*hash) > 0)
{
// we know something more
- _LOG_DEBUG ("found hash in sync log");
+ _LOG_TRACE ("found hash in sync log");
SyncStateMsgPtr msg = m_log->FindStateDifferences(*hash, *m_rootHash);
- Bytes syncData;
- msgToBytes(msg, syncData);
- m_handle->publishData(name, syncData, FRESHNESS);
- _LOG_DEBUG (m_log->GetLocalName () << " publishes: " << *hash);
- printMsg(msg);
+ BytesPtr syncData = serializeMsg (msg);
+ m_ccnx->publishData(name, *syncData, FRESHNESS);
+ _LOG_TRACE (m_log->GetLocalName () << " publishes: " << *hash);
+ _LOG_TRACE (msg);
}
else
{
// we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
double wait = m_recoverWaitGenerator->nextInterval();
- _LOG_DEBUG (m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash);
- _LOG_DEBUG ("recover task scheduled after wait: " << wait);
+ _LOG_TRACE (m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash);
+ _LOG_TRACE ("recover task scheduled after wait: " << wait);
Scheduler::scheduleOneTimeTask (m_scheduler,
wait, boost::bind(&SyncCore::recover, this, hash),
@@ -292,8 +208,8 @@
return;
}
- _LOG_DEBUG (m_log->GetLocalName () << " receives Msg ");
- printMsg (msg);
+ _LOG_TRACE (m_log->GetLocalName () << " receives Msg ");
+ _LOG_TRACE (msg);
int size = msg->state_size();
int index = 0;
while (index < size)
@@ -311,16 +227,15 @@
{
string locStr = state.locator();
Name locatorName((const unsigned char *)locStr.c_str(), locStr.size());
- // cout << ", Got loc: " << locatorName << endl;
+ // cout << ", Got loc: " << locatorName << endl;
m_log->UpdateLocator(deviceName, locatorName);
- // WriteLock lock(m_ypMutex);
- // m_yp[deviceName] = locatorName;
- _LOG_DEBUG ("self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName);
+
+ _LOG_TRACE ("self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName);
}
}
else
{
- _LOG_TRACE ("nani");
+ _LOG_ERROR ("Receive SYNC DELETE, but we don't support it yet");
deregister(deviceName);
}
index++;
@@ -341,9 +256,13 @@
void
SyncCore::sendSyncInterest()
{
- Name syncInterest = constructSyncName(m_rootHash);
- m_handle->sendInterest(syncInterest, m_syncClosure);
- _LOG_DEBUG (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
+ Name syncInterest = Name (m_syncPrefix)(m_rootHash->GetHash(), m_rootHash->GetHashBytes());
+
+ m_ccnx->sendInterest(syncInterest,
+ Closure (boost::bind(&SyncCore::handleSyncData, this, _1, _2),
+ boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1)));
+
+ _LOG_TRACE (m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash);
}
void
@@ -351,16 +270,19 @@
{
if (!(*hash == *m_rootHash) && m_log->LookupSyncLog(*hash) <= 0)
{
- _LOG_DEBUG (m_log->GetLocalName () << ", Recover for: " << *hash);
+ _LOG_TRACE (m_log->GetLocalName () << ", Recover for: " << *hash);
// unfortunately we still don't recognize this hash
Bytes bytes;
readRaw(bytes, (const unsigned char *)hash->GetHash(), hash->GetHashBytes());
- Name recoverInterest = m_syncPrefix;
- recoverInterest.appendComp(RECOVER);
+
// append the unknown hash
- recoverInterest.appendComp(bytes);
- m_handle->sendInterest(recoverInterest, m_recoverClosure);
- _LOG_DEBUG (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
+ Name recoverInterest = Name (m_syncPrefix)(RECOVER)(bytes);
+
+ m_ccnx->sendInterest(recoverInterest,
+ Closure (boost::bind(&SyncCore::handleRecoverData, this, _1, _2),
+ boost::bind(&SyncCore::handleRecoverInterestTimeout, this, _1)));
+
+ _LOG_TRACE (m_log->GetLocalName () << " send RECOVER Interest: " << *hash);
}
else
{
@@ -375,24 +297,6 @@
// TODO: handle deregistering
}
-Name
-SyncCore::constructSyncName(const HashPtr &hash)
-{
- Bytes bytes;
- readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes());
- Name syncName = m_syncPrefix;
- syncName.appendComp(bytes);
- return syncName;
-}
-
-void
-SyncCore::msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes)
-{
- int size = msg->ByteSize();
- bytes.resize(size);
- msg->SerializeToArray(head(bytes), size);
-}
-
sqlite3_int64
SyncCore::seq(const Name &name)
{
diff --git a/src/sync-core.h b/src/sync-core.h
index ef65d5d..8caefcb 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -25,13 +25,8 @@
#include "sync-log.h"
#include "ccnx-wrapper.h"
#include "scheduler.h"
-#include "interval-generator.h"
#include <boost/function.hpp>
-#include <boost/thread/shared_mutex.hpp>
-
-using namespace std;
-using namespace Ccnx;
class SyncCore
{
@@ -49,85 +44,69 @@
public:
SyncCore(SyncLogPtr syncLog
- , const Name &userName
- , const Name &localPrefix // routable name used by the local user
- , const Name &syncPrefix // the prefix for the sync collection
+ , const Ccnx::Name &userName
+ , const Ccnx::Name &localPrefix // routable name used by the local user
+ , const Ccnx::Name &syncPrefix // the prefix for the sync collection
, const StateMsgCallback &callback // callback when state change is detected
- , const CcnxWrapperPtr &handle
- , const SchedulerPtr &scheduler);
+ , Ccnx::CcnxWrapperPtr ccnx
+ , SchedulerPtr scheduler);
~SyncCore();
- // some other code should call this fuction when local prefix
- // changes; e.g. when wake up in another network
- // void
- // updateLocalPrefix(const Name &localPrefix);
+ void
+ updateLocalState (sqlite3_int64);
+
+// ------------------ only used in test -------------------------
+public:
+ HashPtr
+ root() const { return m_rootHash; }
+
+ sqlite3_int64
+ seq (const Ccnx::Name &name);
+
+private:
+ void
+ handleInterest(const Ccnx::Name &name);
void
- updateLocalState(sqlite3_int64);
-
- // Name
- // yp(const Name &name);
+ handleSyncData(const Ccnx::Name &name, Ccnx::PcoPtr content);
void
- handleInterest(const Name &name);
+ handleRecoverData(const Ccnx::Name &name, Ccnx::PcoPtr content);
+
+ Ccnx::Closure::TimeoutCallbackReturnValue
+ handleSyncInterestTimeout(const Ccnx::Name &name);
+
+ Ccnx::Closure::TimeoutCallbackReturnValue
+ handleRecoverInterestTimeout(const Ccnx::Name &name);
void
- handleSyncData(const Name &name, Ccnx::PcoPtr content);
-
- void
- handleRecoverData(const Name &name, Ccnx::PcoPtr content);
-
- Closure::TimeoutCallbackReturnValue
- handleSyncInterestTimeout(const Name &name);
-
- Closure::TimeoutCallbackReturnValue
- handleRecoverInterestTimeout(const Name &name);
-
- void
- deregister(const Name &name);
+ deregister(const Ccnx::Name &name);
void
recover(const HashPtr &hash);
-// ------------------ only used in test -------------------------
- HashPtr
- root() { return m_rootHash; }
-
- sqlite3_int64
- seq(const Name &name);
-
private:
void
sendSyncInterest();
void
- handleSyncInterest(const Name &name);
+ handleSyncInterest(const Ccnx::Name &name);
void
- handleRecoverInterest(const Name &name);
+ handleRecoverInterest(const Ccnx::Name &name);
void
- handleStateData(const Bytes &content);
-
- Name
- constructSyncName(const HashPtr &hash);
-
- static void
- msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes);
+ handleStateData(const Ccnx::Bytes &content);
private:
+ Ccnx::CcnxWrapperPtr m_ccnx;
+
SyncLogPtr m_log;
SchedulerPtr m_scheduler;
StateMsgCallback m_stateMsgCallback;
- // Name m_userName;
- // Name m_localPrefix;
- Name m_syncPrefix;
+
+ Ccnx::Name m_syncPrefix;
HashPtr m_rootHash;
- // YellowPage m_yp;
- Mutex m_ypMutex;
- CcnxWrapperPtr m_handle;
- Closure m_syncClosure;
- Closure m_recoverClosure;
IntervalGeneratorPtr m_recoverWaitGenerator;
};
diff --git a/src/sync-log.cc b/src/sync-log.cc
index ed154bc..6c209f2 100644
--- a/src/sync-log.cc
+++ b/src/sync-log.cc
@@ -20,11 +20,14 @@
*/
#include "sync-log.h"
+#include "logging.h"
#include <utility>
#include <boost/make_shared.hpp>
#include <boost/thread.hpp>
+INIT_LOGGER ("Sync.Log");
+
using namespace boost;
using namespace std;
using namespace Ccnx;
@@ -116,7 +119,8 @@
ORDER BY device_name); \
", 0,0,0);
- std::cout << sqlite3_errmsg (m_db) << std::endl;
+ _LOG_DEBUG_COND (sqlite3_errcode (m_db) != SQLITE_OK, "DbError: " << sqlite3_errmsg (m_db));
+
if (res != SQLITE_OK)
{
sqlite3_exec (m_db, "ROLLBACK TRANSACTION;", 0,0,0);
@@ -137,7 +141,7 @@
res += sqlite3_bind_int64 (insertStmt, 1, rowId);
sqlite3_step (insertStmt);
- // std::cout << sqlite3_errmsg (m_db) << std::endl;
+ _LOG_DEBUG_COND (sqlite3_errcode (m_db) != SQLITE_OK, "DbError: " << sqlite3_errmsg (m_db));
if (res != SQLITE_OK)
{
sqlite3_exec (m_db, "ROLLBACK TRANSACTION;", 0,0,0);
@@ -163,7 +167,7 @@
{
sqlite3_exec (m_db, "ROLLBACK TRANSACTION;", 0,0,0);
- // std::cout << sqlite3_errmsg (m_db) << std::endl;
+ _LOG_DEBUG_COND (sqlite3_errcode (m_db) != SQLITE_OK, "DbError: " << sqlite3_errmsg (m_db));
BOOST_THROW_EXCEPTION (Error::Db ()
<< errmsg_info_str ("Not a valid hash in rememberStateInStateLog"));
}
diff --git a/src/sync-state-helper.h b/src/sync-state-helper.h
new file mode 100644
index 0000000..435ef9a
--- /dev/null
+++ b/src/sync-state-helper.h
@@ -0,0 +1,59 @@
+/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
+/*
+ * Copyright (c) 2013 University of California, Los Angeles
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 as
+ * published by the Free Software Foundation;
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ * Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ */
+
+#ifndef SYNC_STATE_HELPER_H
+#define SYNC_STATE_HELPER_H
+
+#include "sync-state.pb.h"
+
+inline std::ostream &
+operator << (std::ostream &os, const SyncStateMsgPtr &msg)
+{
+ os << " ===== start Msg ======" << std::endl;
+
+ int size = msg->state_size();
+ if (size > 0)
+ {
+ int index = 0;
+ while (index < size)
+ {
+ SyncState state = msg->state(index);
+ string strName = state.name();
+ string strLocator = state.locator();
+ sqlite3_int64 seq = state.seq();
+
+ os << "Name: " << Ccnx::Name((const unsigned char *)strName.c_str(), strName.size())
+ << ", Locator: " << Ccnx::Name((const unsigned char *)strLocator.c_str(), strLocator.size())
+ << ", seq: " << seq << std::endl;
+ index ++;
+ }
+ }
+ else
+ {
+ os << "Msg size 0" << std::endl;
+ }
+ os << " ++++++++ end Msg ++++++++ " << std::endl;
+
+ return os;
+}
+
+
+#endif // SYNC_STATE_HELPER_H
diff --git a/test/test-sync-core.cc b/test/test-sync-core.cc
index c8a474a..a7ab3c0 100644
--- a/test/test-sync-core.cc
+++ b/test/test-sync-core.cc
@@ -71,7 +71,7 @@
usleep(1000000);
checkRoots(core1->root(), core2->root());
- _LOG_TRACE ("\n\n\n\n\n\n----------\n");
+ // _LOG_TRACE ("\n\n\n\n\n\n----------\n");
core1->updateLocalState(1);
usleep(100000);
@@ -92,7 +92,8 @@
BOOST_CHECK_EQUAL(log1->LookupLocator (user2), loc2);
// simple simultaneous data generation
- _LOG_TRACE ("\n\n\n\n\n\n----------Simultaneous\n");
+ // _LOG_TRACE ("\n\n\n\n\n\n----------Simultaneous\n");
+ _LOG_TRACE ("Simultaneous");
core1->updateLocalState(11);
usleep(100);