sync: Switch code to use ndn-cxx
Thic commit also moves code to ndn::chronoshare namespace
Change-Id: I6eae8cab53fd68faa0e9523d166dbb60d1b59a95
diff --git a/src/sync-core.hpp b/src/sync-core.hpp
index 7f10717..6679244 100644
--- a/src/sync-core.hpp
+++ b/src/sync-core.hpp
@@ -1,6 +1,6 @@
/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
/**
- * Copyright (c) 2013-2016, Regents of the University of California.
+ * Copyright (c) 2013-2017, Regents of the University of California.
*
* This file is part of ChronoShare, a decentralized file sharing application over NDN.
*
@@ -21,35 +21,113 @@
#ifndef SYNC_CORE_H
#define SYNC_CORE_H
-#include "ccnx-selectors.hpp"
-#include "ccnx-wrapper.hpp"
-#include "scheduler.hpp"
#include "sync-log.hpp"
-#include "task.hpp"
+#include "core/chronoshare-common.hpp"
+#include "core/random-interval-generator.hpp"
-#include <boost/function.hpp>
+#include <ndn-cxx/face.hpp>
+#include <ndn-cxx/security/key-chain.hpp>
+#include <ndn-cxx/util/scheduler-scoped-event-id.hpp>
+#include <ndn-cxx/util/scheduler.hpp>
+
+#include <boost/iostreams/device/back_inserter.hpp>
+#include <boost/iostreams/filter/gzip.hpp>
+#include <boost/iostreams/filtering_stream.hpp>
+
+namespace ndn {
+namespace chronoshare {
+
+// No use this now
+template <class Msg>
+BufferPtr
+serializeMsg(const Msg& msg)
+{
+ int size = msg.ByteSize();
+ BufferPtr bytes = std::make_shared<Buffer>(size);
+ msg.SerializeToArray(bytes->buf(), size);
+ return bytes;
+}
+
+template <class Msg>
+shared_ptr<Msg>
+deserializeMsg(const Buffer& bytes)
+{
+ shared_ptr<Msg> retval(new Msg());
+ if (!retval->ParseFromArray(bytes.buf(), bytes.size())) {
+ // to indicate an error
+ return shared_ptr<Msg>();
+ }
+ return retval;
+}
+
+template <class Msg>
+BufferPtr
+serializeGZipMsg(const Msg& msg)
+{
+ std::vector<char> bytes; // Bytes couldn't work
+ {
+ boost::iostreams::filtering_ostream out;
+ out.push(boost::iostreams::gzip_compressor()); // gzip filter
+ out.push(boost::iostreams::back_inserter(bytes)); // back_inserter sink
+
+ msg.SerializeToOstream(&out);
+ }
+ BufferPtr uBytes = std::make_shared<Buffer>(bytes.size());
+ memcpy(&(*uBytes)[0], &bytes[0], bytes.size());
+ return uBytes;
+}
+
+template <class Msg>
+shared_ptr<Msg>
+deserializeGZipMsg(const Buffer& bytes)
+{
+ std::vector<char> sBytes(bytes.size());
+ memcpy(&sBytes[0], &bytes[0], bytes.size());
+ boost::iostreams::filtering_istream in;
+ in.push(boost::iostreams::gzip_decompressor()); // gzip filter
+ in.push(boost::make_iterator_range(sBytes)); // source
+
+ shared_ptr<Msg> retval = make_shared<Msg>();
+ if (!retval->ParseFromIstream(&in)) {
+ // to indicate an error
+ return shared_ptr<Msg>();
+ }
+
+ return retval;
+}
class SyncCore
{
public:
- typedef boost::function<void(SyncStateMsgPtr stateMsg)> StateMsgCallback;
+ typedef function<void(SyncStateMsgPtr stateMsg)> StateMsgCallback;
- static const int FRESHNESS = 2; // seconds
- static const string RECOVER;
+ static const int FRESHNESS; // seconds
+ static const std::string RECOVER;
static const double WAIT; // seconds;
static const double RANDOM_PERCENT; // seconds;
+ class Error : public boost::exception, public std::runtime_error
+ {
+ public:
+ explicit Error(const std::string& what)
+ : std::runtime_error(what)
+ {
+ }
+ };
+
public:
- SyncCore(SyncLogPtr syncLog, const Ccnx::Name& userName,
- const Ccnx::Name& localPrefix // routable name used by the local user
+ SyncCore(Face& face, SyncLogPtr syncLog, const Name& userName,
+ const Name& localPrefix // routable name used by the local user
,
- const Ccnx::Name& syncPrefix // the prefix for the sync collection
+ const Name& syncPrefix // the prefix for the sync collection
,
const StateMsgCallback& callback // callback when state change is detected
,
- Ccnx::CcnxWrapperPtr ccnx, double syncInterestInterval = -1.0);
+ long syncInterestInterval = -1);
~SyncCore();
+ void updateLocalState(sqlite3_int64);
+
void
localStateChanged();
@@ -62,71 +140,86 @@
void
localStateChangedDelayed();
- void updateLocalState(sqlite3_int64);
-
// ------------------ only used in test -------------------------
+
public:
- HashPtr
+ ConstBufferPtr
root() const
{
- return m_rootHash;
+ return m_rootDigest;
}
sqlite3_int64
- seq(const Ccnx::Name& name);
+ seq(const Name& name);
private:
void
- handleInterest(const Ccnx::Name& name);
+ onRegisterFailed(const Name& prefix, const std::string& reason)
+ {
+ std::cerr << "ERROR: Failed to register prefix \"" << prefix << "\" in local hub's daemon ("
+ << reason << ")" << std::endl;
+ throw Error("ERROR: Failed to register prefix (" + reason + ")");
+ }
void
- handleSyncData(const Ccnx::Name& name, Ccnx::PcoPtr content);
-
- void
- handleRecoverData(const Ccnx::Name& name, Ccnx::PcoPtr content);
-
- void
- handleSyncInterestTimeout(const Ccnx::Name& name, const Ccnx::Closure& closure,
- Ccnx::Selectors selectors);
-
- void
- handleRecoverInterestTimeout(const Ccnx::Name& name, const Ccnx::Closure& closure,
- Ccnx::Selectors selectors);
-
- void
- deregister(const Ccnx::Name& name);
-
- void
- recover(HashPtr hash);
-
-private:
- void
sendSyncInterest();
void
- handleSyncInterest(const Ccnx::Name& name);
+ sendPeriodicSyncInterest(const time::seconds& interval);
void
- handleRecoverInterest(const Ccnx::Name& name);
+ recover(ConstBufferPtr digest);
void
- handleStateData(const Ccnx::Bytes& content);
+ handleInterest(const InterestFilter& filter, const Interest& interest);
+
+ void
+ handleSyncInterest(const Name& name);
+
+ void
+ handleRecoverInterest(const Name& name);
+
+ void
+ handleSyncInterestTimeout(const Interest& interest);
+
+ void
+ handleRecoverInterestTimeout(const Interest& interest);
+
+ void
+ handleSyncData(const Interest& interest, Data& data);
+
+ void
+ handleRecoverData(const Interest& interest, Data& data);
+
+ void
+ handleStateData(const Buffer& content);
+
+ void
+ deregister(const Name& name);
private:
- Ndnx::NdnxWrapperPtr m_ndnx;
+ Face& m_face;
SyncLogPtr m_log;
- SchedulerPtr m_scheduler;
+
+ Scheduler m_scheduler;
+ util::scheduler::ScopedEventId m_syncInterestEvent;
+ util::scheduler::ScopedEventId m_periodicInterestEvent;
+ util::scheduler::ScopedEventId m_localStateDelayedEvent;
+
StateMsgCallback m_stateMsgCallback;
- Ndnx::Name m_syncPrefix;
- HashPtr m_rootHash;
+ Name m_syncPrefix;
+ ConstBufferPtr m_rootDigest;
IntervalGeneratorPtr m_recoverWaitGenerator;
- TaskPtr m_sendSyncInterestTask;
-
- double m_syncInterestInterval;
+ long m_syncInterestInterval;
+ KeyChain m_keyChain;
+ const RegisteredPrefixId* m_registeredPrefixId;
};
+} // namespace chronoshare
+} // namespace ndn
+
#endif // SYNC_CORE_H