in process of writing sync-core
diff --git a/src/event-scheduler.cpp b/src/event-scheduler.cpp
index be67aad..1c892d6 100644
--- a/src/event-scheduler.cpp
+++ b/src/event-scheduler.cpp
@@ -79,9 +79,11 @@
m_invoked = false;
}
-PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator)
+PeriodicTask::PeriodicTask(const Callback &callback, const Tag &tag, const SchedulerPtr &scheduler, const IntervalGeneratorPtr &generator, int repeat)
: Task(callback, tag, scheduler)
, m_generator(generator)
+ , m_repeat(repeat)
+ , m_indefinite(m_repeat > 0)
{
}
@@ -92,7 +94,22 @@
{
m_callback();
m_invoked = true;
- m_scheduler->rescheduleTask(m_tag);
+ if (m_indefinite)
+ {
+ m_scheduler->rescheduleTask(m_tag);
+ }
+ else
+ {
+ m_repeat--;
+ if (m_repeat > 0)
+ {
+ m_scheduler->rescheduleTask(m_tag);
+ }
+ else
+ {
+ deregisterSelf();
+ }
+ }
}
}
@@ -104,6 +121,12 @@
setTv(interval);
}
+void
+PeriodicTask::deregisterSelf()
+{
+ m_scheduler->deleteTask(m_tag);
+}
+
RandomIntervalGenerator::RandomIntervalGenerator(double interval, double percent, Direction direction)
: m_interval(interval)
, m_rng(time(NULL))
diff --git a/src/sync-core.cc b/src/sync-core.cc
index d0312d0..21201aa 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -22,7 +22,113 @@
#include "sync-core.h"
SyncCore::SyncCore
- : m_log(path, localName)
+ : m_log(path, userName)
+ , m_localPrefix(localPrefix)
+ , m_syncPrefix(syncPrefix)
+ , m_stateMsgCallback(callback)
, m_handle(handle)
+ , m_scheduler(scheduler)
{
+ m_rootHash = m_log.RememberStateInStateLog();
+ m_interestClosure = new Closure(0, boost::bind(&SyncCore::handleSyncData, this, _1, _2), boost::bind(&SyncCore::handleSyncInterestTimeout, this, _1));
+ m_scheduler->start();
+ sendSyncInterest();
+}
+
+SyncCore::~SyncCore()
+{
+ m_scheduler->stop();
+ delete m_interestClosure;
+ m_interestClosure = 0;
+}
+
+void
+SyncCore::updateLocalPrefix(const Name &localPrefix)
+{
+ m_localPrefix = localPrefix;
+ // optionally, we can have a sync action to announce the new prefix
+ // we are not doing this for now
+}
+
+void
+SyncCore::updateLocalState(seqno_t seqno)
+{
+ m_log.UpdateDeviceSeqNo(seqno);
+ HashPtr oldHash = m_rootHash;
+ m_rootHash = m_log.RememberStateInStateLog();
+
+ SyncStateMsgPtr msg = m_log.FindStateDifferences(oldHash, m_rootHash);
+
+ // reply sync Interest with oldHash as last component
+ Name syncName = constructSyncName(oldHash);
+ Bytes syncData;
+ msgToBytes(msg, syncData);
+ m_handle->publishData(syncName, syncData, FRESHNESS);
+
+ // no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
+ // this is trying to avoid the situation that the order of SyncData and new Sync Interest gets reversed at receivers
+ ostringstream ss;
+ ss << m_rootHash;
+ TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::sendSyncInterest, this), ss.str(), m_scheduler, 0.05));
+ m_scheduler->addTask(task);
+}
+
+void
+SyncCore::handleSyncInterest(const Name &name)
+{
+}
+
+Closure::TimeoutCallbackReturnValue
+SyncCore::handleSyncInterestTimeout(const Name &name)
+{
+ // sendInterestInterest with the current root hash;
+ sendSyncInterest();
+ return Closure::OK;
+}
+
+void
+SyncCore::handleSyncData(const Name &name, const Bytes &content)
+{
+ SyncStateMsgPtr msg(new SyncStateMsg);
+ bool success = msg->ParseFromArray(head(content), content.size());
+ if(!success)
+ {
+ // ignore misformed SyncData
+ cerr << "Misformed SyncData with name: " << name << endl;
+ return;
+ }
+
+ int size = msg->state_size();
+ int index = 0;
+ while (index < size)
+ {
+ SyncState state = msg->state(index);
+ index++;
+ }
+
+}
+
+void
+SyncCore::sendSyncInterest()
+{
+ Name syncInterest = constructSyncName(m_rootHash);
+ sendInterest(syncInterest, m_interestClosure);
+}
+
+Name
+SyncCore::constructSyncName(const HashPtr &hash)
+{
+ Bytes bytes;
+ readRaw(bytes, (const unsigned char*)hash->GetHash(), hash->GetHashBytes);
+ Name syncName = m_syncPrefix;
+ syncName.append(bytes);
+ return syncName;
+}
+
+void
+SyncCore::msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes)
+{
+ int size = msg.ByteSize();
+ bytes.resize(size);
+ msg.SerializeToArray(head(bytes), size);
}
diff --git a/src/sync-core.h b/src/sync-core.h
index 7e787d3..b06b8b8 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -25,6 +25,8 @@
#include "sync-log.h"
#include <ccnx-wrapper.h>
#include <event-scheduler.h>
+#include <boost/function.hpp>
+#include <boost/thread/shared_mutex.hpp>
using namespace std;
using namespace Ccnx;
@@ -32,13 +34,71 @@
class SyncCore
{
public:
- SyncCore(const string &path, const Name &localName, CcnxWrapperPtr handle, SchedulerPtr scheduler);
+ typedef boost::function<void (const SyncStateMsgPtr & stateMsg) > StateMsgCallback;
+ typedef sqlite3_int64 seqno_t;
+ typedef Map<Name, Name> YellowPage;
+ typedef boost::shared_mutex Mutex;
+ typedef boost::shared_lock<Mutex> ReadLock;
+ typedef boost::unique_lock<Mutex> WriteLock;
+
+ static const int FRESHNESS = 2;
+
+public:
+ SyncCore(const string &path // path where SyncLog is stored
+ , const Name &userName // unique permanent name for local user
+ , const Name &localPrefix // routable name used by the local user
+ , const Name &syncPrefix // the prefix for the sync collection
+ , const StateMsgCallback &callback // callback when state change is detected
+ , CcnxWrapperPtr handle
+ , SchedulerPtr scheduler);
~SyncCore();
+ // some other code should call this fuction when local prefix
+ // changes; e.g. when wake up in another network
+ void
+ updateLocalPrefix(const Name &localPrefix);
+
+ void
+ updateLocalState(seqno_t);
+
+ Name
+ yp(const Name &name);
+
+ void
+ handleSyncInterest(const Name &name);
+
+ Closure::TimeoutCallbackReturnValue
+ handleSyncInterestTimeout(const Name &name);
+
+ void
+ handleSyncData(const Name &name, const Bytes &content);
+
+ void
+ deregister();
+
+protected:
+ void
+ sendSyncInterest();
+
+ Name
+ constructSyncName(const HashPtr &hash);
+
+ static void
+ msgToBytes(const SyncStateMsgPtr &msg, Bytes &bytes);
+
protected:
SyncLog m_log;
- CcnxWrapperPtr m_handle;
SchedulerPtr m_scheduler;
+ StateMsgCallback m_stateMsgCallback;
+ Name m_userName;
+ Name m_localPrefix;
+ Name m_syncPrefix;
+ HashPtr m_rootHash;
+ YellowPage m_yp;
+ Mutex m_ypMutex;
+ CcnxWrapperPtr m_handle;
+ Closure *m_interestClosure;
+
};
#endif // SYNC_CORE_H
diff --git a/src/sync-log.cc b/src/sync-log.cc
index ca6adf7..4b0984a 100644
--- a/src/sync-log.cc
+++ b/src/sync-log.cc
@@ -30,8 +30,6 @@
: DbHelper (path)
, m_localName (localName)
{
- SyncLog::RememberStateInStateLog ();
-
UpdateDeviceSeqno (localName, 0);
sqlite3_stmt *stmt;