Dispatcher is almost ready. File assembly part is temporarily commented out (need to check if assembly is actually necessary)
diff --git a/src/action-log.cc b/src/action-log.cc
index becbfe3..e33a221 100644
--- a/src/action-log.cc
+++ b/src/action-log.cc
@@ -106,11 +106,14 @@
ActionLog::ActionLog (Ccnx::CcnxWrapperPtr ccnx, const boost::filesystem::path &path,
SyncLogPtr syncLog,
- const std::string &sharedFolder)
+ const std::string &sharedFolder,
+ OnFileAddedOrChangedCallback onFileAddedOrChanged, OnFileRemovedCallback onFileRemoved)
: DbHelper (path / ".chronoshare", "action-log.db")
, m_syncLog (syncLog)
, m_ccnx (ccnx)
, m_sharedFolderName (sharedFolder)
+ , m_onFileAddedOrChanged (onFileAddedOrChanged)
+ , m_onFileRemoved (onFileRemoved)
{
sqlite3_exec (m_db, INIT_DATABASE.c_str (), NULL, NULL, NULL);
_LOG_DEBUG_COND (sqlite3_errcode (m_db) != SQLITE_OK, sqlite3_errmsg (m_db));
@@ -161,7 +164,7 @@
}
// local add action. remote action is extracted from content object
-void
+ActionItemPtr
ActionLog::AddLocalActionUpdate (const std::string &filename,
const Hash &hash,
time_t wtime,
@@ -222,30 +225,30 @@
sqlite3_bind_int64 (stmt, 14, parent_seq_no);
}
- ActionItem item;
- item.set_action (ActionItem::UPDATE);
- item.set_filename (filename);
- item.set_version (version);
- item.set_timestamp (action_time);
- item.set_file_hash (hash.GetHash (), hash.GetHashBytes ());
- // item.set_atime (atime);
- item.set_mtime (wtime);
- // item.set_ctime (ctime);
- item.set_mode (mode);
- item.set_seg_num (seg_num);
+ ActionItemPtr item = make_shared<ActionItem> ();
+ item->set_action (ActionItem::UPDATE);
+ item->set_filename (filename);
+ item->set_version (version);
+ item->set_timestamp (action_time);
+ item->set_file_hash (hash.GetHash (), hash.GetHashBytes ());
+ // item->set_atime (atime);
+ item->set_mtime (wtime);
+ // item->set_ctime (ctime);
+ item->set_mode (mode);
+ item->set_seg_num (seg_num);
if (parent_device_name && parent_seq_no > 0)
{
// cout << Name (*parent_device_name) << endl;
- item.set_parent_device_name (parent_device_name->buf (), parent_device_name->length ());
- item.set_parent_seq_no (parent_seq_no);
+ item->set_parent_device_name (parent_device_name->buf (), parent_device_name->length ());
+ item->set_parent_seq_no (parent_seq_no);
}
// assign name to the action, serialize action, and create content object
string item_msg;
- item.SerializeToString (&item_msg);
+ item->SerializeToString (&item_msg);
Name actionName = Name (m_syncLog->GetLocalName ())("action")(m_sharedFolderName)(seq_no);
_LOG_DEBUG ("ActionName: " << actionName);
@@ -264,6 +267,8 @@
sqlite3_finalize (stmt);
sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
+
+ return item;
}
// void
@@ -274,7 +279,7 @@
// << errmsg_info_str ("Move operation is not yet supported"));
// }
-void
+ActionItemPtr
ActionLog::AddLocalActionDelete (const std::string &filename)
{
sqlite3_exec (m_db, "BEGIN TRANSACTION;", 0,0,0);
@@ -290,7 +295,7 @@
if (!parent_device_name) // no records exist or file was already deleted
{
sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
- return;
+ return ActionItemPtr ();
}
version ++;
@@ -316,16 +321,16 @@
sqlite3_bind_int64 (stmt, 8, parent_seq_no);
- ActionItem item;
- item.set_action (ActionItem::UPDATE);
- item.set_filename (filename);
- item.set_version (version);
- item.set_timestamp (action_time);
- item.set_parent_device_name (parent_device_name->buf (), parent_device_name->length ());
- item.set_parent_seq_no (parent_seq_no);
+ ActionItemPtr item = make_shared<ActionItem> ();
+ item->set_action (ActionItem::UPDATE);
+ item->set_filename (filename);
+ item->set_version (version);
+ item->set_timestamp (action_time);
+ item->set_parent_device_name (parent_device_name->buf (), parent_device_name->length ());
+ item->set_parent_seq_no (parent_seq_no);
string item_msg;
- item.SerializeToString (&item_msg);
+ item->SerializeToString (&item_msg);
Name actionName = Name (m_syncLog->GetLocalName ())("action")(m_sharedFolderName)(seq_no);
Bytes actionData = m_ccnx->createContentObject (actionName, item_msg.c_str (), item_msg.size ());
@@ -345,6 +350,8 @@
sqlite3_finalize (stmt);
sqlite3_exec (m_db, "END TRANSACTION;", 0,0,0);
+
+ return item;
}
@@ -426,7 +433,7 @@
return action;
}
-void
+ActionItemPtr
ActionLog::AddRemoteAction (const Ccnx::Name &deviceName, sqlite3_int64 seqno, Ccnx::PcoPtr actionPco)
{
if (!actionPco)
@@ -490,9 +497,11 @@
_LOG_DEBUG_COND (sqlite3_errcode (m_db) != SQLITE_OK && sqlite3_errcode (m_db) != SQLITE_ROW, sqlite3_errmsg (m_db));
sqlite3_finalize (stmt);
+
+ return action;
}
-void
+ActionItemPtr
ActionLog::AddRemoteAction (Ccnx::PcoPtr actionPco)
{
Name name = actionPco->name ();
@@ -516,7 +525,7 @@
_LOG_DEBUG ("From [" << name << "] extracted deviceName: " << deviceName << ", sharedFolder: " << sharedFolder << ", seqno: " << seqno);
- AddRemoteAction (deviceName, seqno, actionPco);
+ return AddRemoteAction (deviceName, seqno, actionPco);
}
///////////////////////////////////////////////////////////////////////////////////
diff --git a/src/action-log.h b/src/action-log.h
index 4123713..fec6e82 100644
--- a/src/action-log.h
+++ b/src/action-log.h
@@ -37,14 +37,21 @@
class ActionLog : public DbHelper
{
public:
+ typedef boost::function<void (std::string /*filename*/, Ccnx::Name /*device_name*/, sqlite3_int64 /*seq_no*/,
+ HashPtr /*hash*/, time_t /*m_time*/, int /*mode*/, int /*seg_num*/)> OnFileAddedOrChangedCallback;
+
+ typedef boost::function<void (std::string /*filename*/)> OnFileRemovedCallback;
+
+public:
ActionLog (Ccnx::CcnxWrapperPtr ccnx, const boost::filesystem::path &path,
SyncLogPtr syncLog,
- const std::string &sharedFolder);
+ const std::string &sharedFolder,
+ OnFileAddedOrChangedCallback onFileAddedOrChanged, OnFileRemovedCallback onFileRemoved);
//////////////////////////
// Local operations //
//////////////////////////
- void
+ ActionItemPtr
AddLocalActionUpdate (const std::string &filename,
const Hash &hash,
time_t wtime,
@@ -54,7 +61,7 @@
// void
// AddActionMove (const std::string &oldFile, const std::string &newFile);
- void
+ ActionItemPtr
AddLocalActionDelete (const std::string &filename);
bool
@@ -64,7 +71,7 @@
// Remote operations //
//////////////////////////
- void
+ ActionItemPtr
AddRemoteAction (const Ccnx::Name &deviceName, sqlite3_int64 seqno, Ccnx::PcoPtr actionPco);
/**
@@ -72,7 +79,7 @@
*
* This function extracts device name and sequence number from the content object's and calls the overloaded method
*/
- void
+ ActionItemPtr
AddRemoteAction (Ccnx::PcoPtr actionPco);
///////////////////////////
@@ -110,6 +117,9 @@
Ccnx::CcnxWrapperPtr m_ccnx;
std::string m_sharedFolderName;
+
+ OnFileAddedOrChangedCallback m_onFileAddedOrChanged;
+ OnFileRemovedCallback m_onFileRemoved;
};
namespace Error {
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index d7e570a..d9bc37a 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -20,16 +20,23 @@
*/
#include "dispatcher.h"
+#include "logging.h"
+
#include <boost/make_shared.hpp>
+#include <boost/lexical_cast.hpp>
using namespace Ccnx;
using namespace std;
using namespace boost;
-static const string BROADCAST_DOMAIN = "/ndn/broadcast/chronoshare";
-static const int MAX_FILE_SEGMENT_SIZE = 1024;
+INIT_LOGGER ("Dispatcher");
-Dispatcher::Dispatcher(const filesystem::path &path, const std::string &localUserName, const Ccnx::Name &localPrefix, const std::string &sharedFolder, const filesystem::path &rootDir, Ccnx::CcnxWrapperPtr ccnx, SchedulerPtr scheduler, int poolSize)
+static const string BROADCAST_DOMAIN = "/ndn/broadcast/chronoshare";
+
+Dispatcher::Dispatcher(const filesystem::path &path, const std::string &localUserName,
+ const Ccnx::Name &localPrefix, const std::string &sharedFolder,
+ const filesystem::path &rootDir, Ccnx::CcnxWrapperPtr ccnx,
+ SchedulerPtr scheduler, int poolSize)
: m_ccnx(ccnx)
, m_core(NULL)
, m_rootDir(rootDir)
@@ -40,7 +47,10 @@
, m_server(NULL)
{
m_syncLog = make_shared<SyncLog>(path, localUserName);
- m_actionLog = make_shared<ActionLog>(m_ccnx, path, m_syncLog, sharedFolder);
+ m_actionLog = make_shared<ActionLog>(m_ccnx, path, m_syncLog, sharedFolder,
+ // bind (&Dispatcher::Did_ActionLog_ActionApply_AddOrModify, this, _1, _2, _3, _4, _5, _6, _7),
+ ActionLog::OnFileAddedOrChangedCallback (), // don't really need this callback
+ bind (&Dispatcher::Did_ActionLog_ActionApply_Delete, this, _1));
Name syncPrefix = Name(BROADCAST_DOMAIN)(sharedFolder);
m_server = new ContentServer(m_ccnx, m_actionLog, rootDir);
@@ -48,8 +58,10 @@
m_server->registerPrefix(syncPrefix);
m_core = new SyncCore (m_syncLog, localUserName, localPrefix, syncPrefix,
- bind(&Dispatcher::syncStateChanged, this, _1), ccnx, scheduler);
+ bind(&Dispatcher::Did_SyncLog_StateChange, this, _1), ccnx, scheduler);
+ m_actionFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3);
+ m_fileFetcher = make_shared<FetchManager> (m_ccnx, bind (&SyncLog::LookupLocator, &*m_syncLog, _1), 3);
}
Dispatcher::~Dispatcher()
@@ -67,61 +79,31 @@
}
}
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+
void
-Dispatcher::fileChangedCallback(const filesystem::path &relativeFilePath, ActionType type)
+Dispatcher::Did_LocalFile_AddOrModify (const filesystem::path &relativeFilePath)
{
- Executor::Job job = bind(&Dispatcher::fileChanged, this, relativeFilePath, type);
- m_executor.execute(job);
+ m_executor.execute (bind (&Dispatcher::Did_LocalFile_AddOrModify_Execute, this, relativeFilePath));
}
void
-Dispatcher::syncStateChangedCallback(const SyncStateMsgPtr &stateMsg)
+Dispatcher::Did_LocalFile_AddOrModify_Execute (filesystem::path relativeFilePath)
{
- Executor::Job job = bind(&Dispatcher::syncStateChanged, this, stateMsg);
- m_executor.execute(job);
-}
-
-void
-Dispatcher::actionReceivedCallback(const ActionItemPtr &actionItem)
-{
- Executor::Job job = bind(&Dispatcher::actionReceived, this, actionItem);
- m_executor.execute(job);
-}
-
-void
-Dispatcher::fileSegmentReceivedCallback(const Ccnx::Name &name, const Ccnx::Bytes &content)
-{
- Executor::Job job = bind(&Dispatcher::fileSegmentReceived, this, name, content);
- m_executor.execute(job);
-}
-
-void
-Dispatcher::fileReadyCallback(const Ccnx::Name &fileNamePrefix)
-{
- Executor::Job job = bind(&Dispatcher::fileReady, this, fileNamePrefix);
- m_executor.execute(job);
-}
-
-void
-Dispatcher::fileChanged(const filesystem::path &relativeFilePath, ActionType type)
-{
-
- switch (type)
- {
- case UPDATE:
+ filesystem::path absolutePath = m_rootDir / relativeFilePath;
+ if (filesystem::exists(absolutePath))
{
- filesystem::path absolutePath = m_rootDir / relativeFilePath;
- if (filesystem::exists(absolutePath))
- {
- HashPtr hash = Hash::FromFileContent(absolutePath);
- if (m_actionLog->KnownFileState(relativeFilePath.generic_string(), *hash))
+ HashPtr hash = Hash::FromFileContent(absolutePath);
+ if (m_actionLog->KnownFileState(relativeFilePath.generic_string(), *hash))
{
// the file state is known; i.e. the detected changed file is identical to
// the file state kept in FileState table
// it is the case that backend puts the file fetched from remote;
// we should not publish action for this.
}
- else
+ else
{
uintmax_t fileSize = filesystem::file_size(absolutePath);
int seg_num;
@@ -137,27 +119,49 @@
// notify SyncCore to propagate the change
m_core->localStateChanged();
}
- break;
- }
- else
- {
- BOOST_THROW_EXCEPTION (Error::Dispatcher() << error_info_str("Update non exist file: " + absolutePath.string() ));
- }
}
- case DELETE:
+ else
{
- m_actionLog->AddLocalActionDelete (relativeFilePath.generic_string());
- // notify SyncCore to propagate the change
- m_core->localStateChanged();
- break;
+ BOOST_THROW_EXCEPTION (Error::Dispatcher() << error_info_str("Update non exist file: " + absolutePath.string() ));
}
- default:
- break;
- }
}
void
-Dispatcher::syncStateChanged(const SyncStateMsgPtr &stateMsg)
+Dispatcher::Did_LocalFile_Delete (const filesystem::path &relativeFilePath)
+{
+ m_executor.execute (bind (&Dispatcher::Did_LocalFile_Delete_Execute, this, relativeFilePath));
+}
+
+void
+Dispatcher::Did_LocalFile_Delete_Execute (filesystem::path relativeFilePath)
+{
+ m_actionLog->AddLocalActionDelete (relativeFilePath.generic_string());
+ // notify SyncCore to propagate the change
+ m_core->localStateChanged();
+}
+
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+/////////////////////////////////////////////////////////////////////////////////////////////////////
+
+/**
+ * Callbacks:
+ *
+ * - from SyncLog: when state changes -> to fetch missing actions
+ *
+ * - from FetchManager/Actions: when action is fetched -> to request a file, specified by the action
+ * -> to add action to the action log
+ *
+ * - from ActionLog/Delete: when action applied (file state changed, file deleted) -> to delete local file
+ *
+ * - from ActionLog/AddOrUpdate: when action applied (file state changes, file added or modified) -> do nothing?
+ *
+ * - from FetchManager/Files: when file segment is retrieved -> save it in ObjectDb
+ * when file fetch is completed -> if file belongs to FileState, then assemble it to filesystem. Don't do anything otherwise
+ */
+
+void
+Dispatcher::Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg)
{
int size = stateMsg->state_size();
int index = 0;
@@ -170,101 +174,115 @@
uint64_t oldSeq = state.old_seq();
uint64_t newSeq = state.seq();
Name userName = state.name();
- Name locator = state.locator();
// fetch actions with oldSeq + 1 to newSeq (inclusive)
- Name actionNameBase(userName);
- actionNameBase.appendComp("action")
- .appendComp(m_sharedFolder);
+ Name actionNameBase = Name(userName)("action")(m_sharedFolder);
- for (uint64_t seqNo = oldSeq + 1; seqNo <= newSeq; seqNo++)
- {
- Name actionName = actionNameBase;
- actionName.appendComp(seqNo);
-
- // TODO:
- // use fetcher to fetch the name with callback "actionRecieved"
- }
+ m_actionFetcher->Enqueue (userName, actionNameBase,
+ bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback (),
+ oldSeq + 1, newSeq, FetchManager::PRIORITY_HIGH);
}
}
}
void
-Dispatcher::actionReceived(const ActionItemPtr &actionItem)
+Dispatcher::Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco)
{
- switch (actionItem->action())
- {
- case ActionItem::UPDATE:
- {
- // @TODO
- // need a function in ActionLog to apply received action, i.e. record remote action in ActionLog
+ /// @todo Errors and exception checking
+ _LOG_DEBUG ("Received action deviceName: " << deviceName << ", actionName: " << actionName << ", seqno: " << seqno);
- string hashBytes = actionItem->file_hash();
- Hash hash(hashBytes.c_str(), hashBytes.size());
- ostringstream oss;
- oss << hash;
- string hashString = oss.str();
- ObjectDbPtr db = make_shared<ObjectDb>(m_rootDir, hashString);
- m_objectDbMap[hashString] = db;
+ ActionItemPtr action = m_actionLog->AddRemoteAction (deviceName, seqno, actionPco);
+ // trigger may invoke Did_ActionLog_ActionApply_Delete or Did_ActionLog_ActionApply_AddOrModify callbacks
- // TODO:
- // user fetcher to fetch the file with callback "fileSegmentReceived" for segment callback and "fileReady" for file ready callback
- break;
- }
- case ActionItem::DELETE:
+ if (action->action () == ActionItem::UPDATE)
{
- string filename = actionItem->filename();
- // TODO:
- // m_actionLog->AddRemoteActionDelete(filename);
- break;
+ Hash hash (action->file_hash ().c_str(), action->file_hash ().size ());
+
+ Name fileNameBase = Name (deviceName)("file")(hash.GetHash (), hash.GetHashBytes ());
+
+ if (m_objectDbMap.find (hash) == m_objectDbMap.end ())
+ {
+ m_objectDbMap [hash] = make_shared<ObjectDb> (m_rootDir, lexical_cast<string> (hash));
+ }
+
+ m_fileFetcher->Enqueue (deviceName, fileNameBase,
+ bind (&Dispatcher::Did_FetchManager_FileSegmentFetch, this, _1, _2, _3, _4),
+ bind (&Dispatcher::Did_FetchManager_FileFetchComplete, this, _1, _2),
+ 0, action->seg_num (), FetchManager::PRIORITY_NORMAL);
}
- default:
- break;
- }
}
void
-Dispatcher::fileSegmentReceived(const Ccnx::Name &name, const Ccnx::Bytes &content)
+Dispatcher::Did_ActionLog_ActionApply_Delete (const std::string &filename)
{
- int size = name.size();
- uint64_t segment = name.getCompAsInt(size - 1);
- Bytes hashBytes = name.getComp(size - 2);
- Hash hash(head(hashBytes), hashBytes.size());
- ostringstream oss;
- oss << hash;
- string hashString = oss.str();
- if (m_objectDbMap.find(hashString) != m_objectDbMap.end())
+ m_executor.execute (bind (&Dispatcher::Did_ActionLog_ActionApply_Delete_Execute, this, filename));
+}
+
+void
+Dispatcher::Did_ActionLog_ActionApply_Delete_Execute (std::string filename)
+{
+ _LOG_DEBUG ("Action to delete " << filename);
+
+ filesystem::path absolutePath = m_rootDir / filename;
+ if (filesystem::exists(absolutePath))
+ {
+ // need some protection from local detection of removal
+ remove (absolutePath);
+ }
+ // don't exist
+}
+
+void
+Dispatcher::Did_FetchManager_FileSegmentFetch (const Ccnx::Name &deviceName, const Ccnx::Name &fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
+{
+ m_executor.execute (bind (&Dispatcher::Did_FetchManager_FileSegmentFetch_Execute, this, deviceName, fileSegmentName, segment, fileSegmentPco));
+}
+
+void
+Dispatcher::Did_FetchManager_FileSegmentFetch_Execute (Ccnx::Name deviceName, Ccnx::Name fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
+{
+ const Bytes &hashBytes = fileSegmentName.getCompFromBack (1);
+ Hash hash (head(hashBytes), hashBytes.size());
+
+ _LOG_DEBUG ("Received segment deviceName: " << deviceName << ", segmentName: " << fileSegmentName << ", segment: " << segment);
+
+ map<Hash, ObjectDbPtr>::iterator db = m_objectDbMap.find (hash);
+ if (db != m_objectDbMap.end())
{
- ObjectDbPtr db = m_objectDbMap[hashString];
- // get the device name
- // Name deviceName = name.getPartialName();
- // db->saveContenObject(deviceName, segment, content);
+ db->second->saveContentObject(deviceName, segment, fileSegmentPco->buf ());
}
else
{
- cout << "no db available for this content object: " << name << ", size: " << content.size() << endl;
+ _LOG_ERROR ("no db available for this content object: " << fileSegmentName << ", size: " << fileSegmentPco->buf ().size());
}
}
void
-Dispatcher::fileReady(const Ccnx::Name &fileNamePrefix)
+Dispatcher::Did_FetchManager_FileFetchComplete (const Ccnx::Name &deviceName, const Ccnx::Name &fileBaseName)
{
- int size = fileNamePrefix.size();
- Bytes hashBytes = fileNamePrefix.getComp(size - 1);
- Hash hash(head(hashBytes), hashBytes.size());
- ostringstream oss;
- oss << hash;
- string hashString = oss.str();
+ m_executor.execute (bind (&Dispatcher::Did_FetchManager_FileFetchComplete_Execute, this, deviceName, fileBaseName));
+}
- if (m_objectDbMap.find(hashString) != m_objectDbMap.end())
- {
- // remove the db handle
- m_objectDbMap.erase(hashString);
- }
- else
- {
- cout << "no db available for this file: " << fileNamePrefix << endl;
- }
+void
+Dispatcher::Did_FetchManager_FileFetchComplete_Execute (Ccnx::Name deviceName, Ccnx::Name fileBaseName)
+{
+ _LOG_DEBUG ("Finished fetching " << deviceName << ", fileBaseName: " << fileBaseName);
+ // int size = fileNamePrefix.size();
+ // Bytes hashBytes = fileNamePrefix.getComp(size - 1);
+ // Hash hash(head(hashBytes), hashBytes.size());
+ // ostringstream oss;
+ // oss << hash;
+ // string hashString = oss.str();
+
+ // if (m_objectDbMap.find(hashString) != m_objectDbMap.end())
+ // {
+ // // remove the db handle
+ // m_objectDbMap.erase(hashString);
+ // }
+ // else
+ // {
+ // cout << "no db available for this file: " << fileNamePrefix << endl;
+ // }
// query the action table to get the path on local file system
// m_objectManager.objectsToLocalFile(deviceName, hash, relativeFilePath);
diff --git a/src/dispatcher.h b/src/dispatcher.h
index 3ebf67e..3762f0c 100644
--- a/src/dispatcher.h
+++ b/src/dispatcher.h
@@ -29,6 +29,8 @@
#include "object-db.h"
#include "object-manager.h"
#include "content-server.h"
+#include "fetch-manager.h"
+
#include <boost/function.hpp>
#include <boost/filesystem.hpp>
#include <boost/shared_ptr.hpp>
@@ -42,12 +44,6 @@
class Dispatcher
{
public:
- typedef enum
- {
- UPDATE = 0,
- DELETE = 1
- } ActionType;
-
// sharedFolder is the name to be used in NDN name;
// rootDir is the shared folder dir in local file system;
Dispatcher(const boost::filesystem::path &path, const std::string &localUserName, const Ccnx::Name &localPrefix,
@@ -57,41 +53,86 @@
// ----- Callbacks, they only submit the job to executor and immediately return so that event processing thread won't be blocked for too long -------
+
// callback to process local file change
void
- fileChangedCallback(const boost::filesystem::path &relativeFilepath, ActionType type);
+ Did_LocalFile_AddOrModify (const boost::filesystem::path &relativeFilepath);
- // callback to process remote sync state change
void
- syncStateChangedCallback(const SyncStateMsgPtr &stateMsg);
-
- // callback to process remote action data
- void
- actionReceivedCallback(const ActionItemPtr &actionItem);
-
- // callback to porcess file data
- void
- fileSegmentReceivedCallback(const Ccnx::Name &name, const Ccnx::Bytes &content);
-
- // callback to assemble file
- void
- fileReadyCallback(const Ccnx::Name &fileNamePrefix);
+ Did_LocalFile_Delete (const boost::filesystem::path &relativeFilepath);
private:
void
- fileChanged(const boost::filesystem::path &relativeFilepath, ActionType type);
+ Did_LocalFile_AddOrModify_Execute (boost::filesystem::path relativeFilepath); // cannot be const & for Execute event!!! otherwise there will be segfault
void
- syncStateChanged(const SyncStateMsgPtr &stateMsg);
+ Did_LocalFile_Delete_Execute (boost::filesystem::path relativeFilepath); // cannot be const & for Execute event!!! otherwise there will be segfault
+
+
+private:
+ /**
+ * Callbacks:
+ *
+ x * - from SyncLog: when state changes -> to fetch missing actions
+ *
+ x * - from FetchManager/Actions: when action is fetched -> to request a file, specified by the action
+ * -> to add action to the action log
+ *
+ * - from ActionLog/Delete: when action applied (file state changed, file deleted) -> to delete local file
+ *
+ * - from ActionLog/AddOrUpdate: when action applied (file state changes, file added or modified) -> to assemble the file if file is available in the ObjectDb, otherwise, do nothing
+ *
+ x * - from FetchManager/Files: when file segment is retrieved -> save it in ObjectDb
+ * when file fetch is completed -> if file belongs to FileState, then assemble it to filesystem. Don't do anything otherwise
+ */
+
+ // callback to process remote sync state change
+ void
+ Did_SyncLog_StateChange (const SyncStateMsgPtr &stateMsg);
void
- actionReceived(const ActionItemPtr &actionItem);
+ Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco);
void
- fileSegmentReceived(const Ccnx::Name &name, const Ccnx::Bytes &content);
+ Did_ActionLog_ActionApply_Delete (const std::string &filename);
void
- fileReady(const Ccnx::Name &fileNamePrefix);
+ Did_ActionLog_ActionApply_Delete_Execute (std::string filename);
+
+ // void
+ // Did_ActionLog_ActionApply_AddOrModify (const std::string &filename, Ccnx::Name device_name, sqlite3_int64 seq_no,
+ // HashPtr hash, time_t m_time, int mode, int seg_num);
+
+ void
+ Did_FetchManager_FileSegmentFetch (const Ccnx::Name &deviceName, const Ccnx::Name &fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco);
+
+ void
+ Did_FetchManager_FileSegmentFetch_Execute (Ccnx::Name deviceName, Ccnx::Name fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco);
+
+ void
+ Did_FetchManager_FileFetchComplete (const Ccnx::Name &deviceName, const Ccnx::Name &fileBaseName);
+
+ void
+ Did_FetchManager_FileFetchComplete_Execute (Ccnx::Name deviceName, Ccnx::Name fileBaseName);
+
+private:
+ void
+ AssembleFile_Execute (const Ccnx::Name &deviceName, const Hash &filehash, const boost::filesystem::path &relativeFilepath);
+
+ // void
+ // fileChanged(const boost::filesystem::path &relativeFilepath, ActionType type);
+
+ // void
+ // syncStateChanged(const SyncStateMsgPtr &stateMsg);
+
+ // void
+ // actionReceived(const ActionItemPtr &actionItem);
+
+ // void
+ // fileSegmentReceived(const Ccnx::Name &name, const Ccnx::Bytes &content);
+
+ // void
+ // fileReady(const Ccnx::Name &fileNamePrefix);
private:
Ccnx::CcnxWrapperPtr m_ccnx;
@@ -105,9 +146,14 @@
Ccnx::Name m_localUserName;
// maintain object db ptrs so that we don't need to create them
// for every fetched segment of a file
- map<Ccnx::Name, ObjectDbPtr> m_objectDbMap;
+
+ std::map<Hash, ObjectDbPtr> m_objectDbMap;
+
std::string m_sharedFolder;
ContentServer *m_server;
+
+ FetchManagerPtr m_actionFetcher;
+ FetchManagerPtr m_fileFetcher;
};
namespace Error
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 84cee4c..0692702 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -31,7 +31,7 @@
//The disposer object function
struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches/* = 3*/)
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, const Mapping &mapping, uint32_t parallelFetches/* = 3*/)
: m_ccnx (ccnx)
, m_mapping (mapping)
, m_maxParallelFetches (parallelFetches)
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 9e3ff40..0b205eb 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -35,17 +35,17 @@
class FetchManager
{
+public:
enum
{
PRIORITY_NORMAL,
PRIORITY_HIGH
};
-public:
typedef boost::function<Ccnx::Name(const Ccnx::Name &)> Mapping;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
- FetchManager (Ccnx::CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches = 3);
+ FetchManager (Ccnx::CcnxWrapperPtr ccnx, const Mapping &mapping, uint32_t parallelFetches = 3);
virtual ~FetchManager ();
void
@@ -81,8 +81,8 @@
Mapping m_mapping;
SchedulerPtr m_scheduler;
- uint64_t m_maxParallelFetches;
- uint64_t m_currentParallelFetches;
+ uint32_t m_maxParallelFetches;
+ uint32_t m_currentParallelFetches;
boost::mutex m_parellelFetchMutex;
// optimized list structure for fetch queue
diff --git a/src/fetcher.h b/src/fetcher.h
index 9fba688..c86061f 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -95,8 +95,8 @@
int64_t m_minSeqNo;
int64_t m_maxSeqNo;
- uint64_t m_pipeline;
- uint64_t m_activePipeline;
+ uint32_t m_pipeline;
+ uint32_t m_activePipeline;
boost::posix_time::ptime m_lastPositiveActivity;
diff --git a/src/file-item.proto b/src/file-item.proto
index 937226b..02ea9e8 100644
--- a/src/file-item.proto
+++ b/src/file-item.proto
@@ -1,12 +1,5 @@
message FileItem
{
- enum FileType
- {
- LATEST = 0;
- OLDEST = 1;
- }
- required FileType type = 1;
-
required string filename = 2;
required bytes device_name = 3;
required uint64 seq_no = 4;
diff --git a/src/hash-helper.h b/src/hash-helper.h
index 638a93a..4224860 100644
--- a/src/hash-helper.h
+++ b/src/hash-helper.h
@@ -39,6 +39,7 @@
public:
static unsigned char _origin;
static HashPtr Origin;
+
Hash (const void *buf, unsigned int length)
: m_length (length)
{
@@ -49,18 +50,43 @@
}
}
+ Hash (const Hash &otherHash)
+ : m_length (otherHash.m_length)
+ {
+ if (m_length != 0)
+ {
+ m_buf = new unsigned char [m_length];
+ memcpy (m_buf, otherHash.m_buf, otherHash.m_length);
+ }
+ }
+
static HashPtr
FromString (const std::string &hashInTextEncoding);
static HashPtr
FromFileContent (const boost::filesystem::path &fileName);
-
+
~Hash ()
{
if (m_length != 0)
delete [] m_buf;
}
-
+
+ Hash &
+ operator = (const Hash &otherHash)
+ {
+ if (m_length != 0)
+ delete [] m_buf;
+
+ m_length = otherHash.m_length;
+ if (m_length != 0)
+ {
+ m_buf = new unsigned char [m_length];
+ memcpy (m_buf, otherHash.m_buf, otherHash.m_length);
+ }
+ return *this;
+ }
+
bool
IsZero () const
{
@@ -73,10 +99,27 @@
{
if (m_length != otherHash.m_length)
return false;
-
+
return memcmp (m_buf, otherHash.m_buf, m_length) == 0;
}
+ bool operator < (const Hash &otherHash) const
+ {
+ if (m_length < otherHash.m_length)
+ return true;
+
+ if (m_length > otherHash.m_length)
+ return false;
+
+ for (int i = 0; i < m_length; i++)
+ {
+ if (m_buf [i] > otherHash.m_buf [i])
+ return false;
+ }
+
+ return true;
+ }
+
const void *
GetHash () const
{
@@ -88,14 +131,11 @@
{
return m_length;
}
-
+
private:
unsigned char *m_buf;
unsigned int m_length;
- Hash (const Hash &) { }
- Hash & operator = (const Hash &) { return *this; }
-
friend std::ostream &
operator << (std::ostream &os, const Hash &digest);
};