Almost everything works, but still small problem with file assembly
diff --git a/src/content-server.cc b/src/content-server.cc
index 3b3c9c2..0cf5c43 100644
--- a/src/content-server.cc
+++ b/src/content-server.cc
@@ -20,113 +20,160 @@
*/
#include "content-server.h"
+#include "logging.h"
+#include <boost/lexical_cast.hpp>
+
+INIT_LOGGER ("ContentServer");
using namespace Ccnx;
using namespace std;
+using namespace boost;
-ContentServer::ContentServer(CcnxWrapperPtr ccnx, ActionLogPtr actionLog, const boost::filesystem::path &rootDir, int freshness)
- : m_ccnx(ccnx)
- , m_actionLog(actionLog)
- , m_dbFolder(rootDir / ".chronoshare")
- , m_freshness(freshness)
+ContentServer::ContentServer(CcnxWrapperPtr ccnx, ActionLogPtr actionLog,
+ const boost::filesystem::path &rootDir,
+ const Ccnx::Name &deviceName, const std::string &sharedFolderName,
+ int freshness)
+ : m_ccnx(ccnx)
+ , m_actionLog(actionLog)
+ , m_dbFolder(rootDir / ".chronoshare")
+ , m_freshness(freshness)
+ , m_executor (1)
+ , m_deviceName (deviceName)
+ , m_sharedFolderName (sharedFolderName)
{
+ m_executor.start ();
}
ContentServer::~ContentServer()
{
- WriteLock lock(m_mutex);
- int size = m_prefixes.size();
+ m_executor.shutdown ();
+
+ ScopedLock lock (m_mutex);
for (PrefixIt it = m_prefixes.begin(); it != m_prefixes.end(); ++it)
{
- m_ccnx->clearInterestFilter(*it);
+ m_ccnx->clearInterestFilter (Name (*it)(m_deviceName)("action")(m_sharedFolderName));
+ m_ccnx->clearInterestFilter (Name (*it)(m_deviceName)("file"));
}
}
void
ContentServer::registerPrefix(const Name &prefix)
{
- m_ccnx->setInterestFilter(prefix, bind(&ContentServer::serve, this, _1));
- WriteLock lock(m_mutex);
+ _LOG_DEBUG (">> register " << prefix);
+ m_ccnx->setInterestFilter (Name (prefix)(m_deviceName)("action")(m_sharedFolderName), bind(&ContentServer::serve_Action, this, prefix, _1));
+ m_ccnx->setInterestFilter (Name (prefix)(m_deviceName)("file"), bind(&ContentServer::serve_File, this, prefix, _1));
+
+ ScopedLock lock (m_mutex);
m_prefixes.insert(prefix);
}
void
-ContentServer::deregisterPrefix(const Name &prefix)
+ContentServer::deregisterPrefix (const Name &prefix)
{
- WriteLock lock(m_mutex);
- PrefixIt it = m_prefixes.find(prefix);
- if (it != m_prefixes.end())
- {
- m_ccnx->clearInterestFilter(*it);
- m_prefixes.erase(it);
- }
+ _LOG_DEBUG ("<< deregister " << prefix);
+
+ m_ccnx->clearInterestFilter(Name (prefix)(m_deviceName)("action")(m_sharedFolderName));
+ m_ccnx->clearInterestFilter(Name (prefix)(m_deviceName)("file"));
+
+ ScopedLock lock (m_mutex);
+ m_prefixes.erase (prefix);
}
void
-ContentServer::serve(const Name &interest)
+ContentServer::serve_Action (Name forwardingHint, const Name &interest)
{
- ReadLock lock(m_mutex);
- for (PrefixIt it = m_prefixes.begin(); it != m_prefixes.end(); ++it)
- {
- Name prefix = *it;
- int prefixSize = prefix.size();
- int interestSize = interest.size();
- // this is the prefix of the interest
- if (prefixSize <= interestSize && interest.getPartialName(0, prefixSize) == prefix)
+ m_executor.execute (bind (&ContentServer::serve_Action_Execute, this, forwardingHint, interest));
+ // need to unlock ccnx mutex... or at least don't lock it
+}
+
+void
+ContentServer::serve_File (Name forwardingHint, const Name &interest)
+{
+ m_executor.execute (bind (&ContentServer::serve_File_Execute, this, forwardingHint, interest));
+ // need to unlock ccnx mutex... or at least don't lock it
+}
+
+void
+ContentServer::serve_File_Execute (Name forwardingHint, Name interest)
+{
+ // /device-name/file/<file-hash>/segment, or
+
+ int64_t segment = interest.getCompFromBackAsInt (0);
+ Name deviceName = interest.getPartialName (forwardingHint.size (), interest.size () - forwardingHint.size () - 3);
+ Hash hash (head(interest.getCompFromBack (1)), interest.getCompFromBack (1).size());
+
+ _LOG_DEBUG (" server FILE for device: " << deviceName << ", file_hash: " << hash << " segment: " << segment);
+
+ string hashStr = lexical_cast<string> (hash);
+ if (ObjectDb::DoesExist (m_dbFolder, deviceName, hashStr)) // this is kind of overkill, as it counts available segments
{
- // originalName should be either
- // /device-name/file/file-hash/segment, or
- // /device-name/action/shared-folder/seq
- Name originalName = interest.getPartialName(prefixSize);
- int nameSize = originalName.size();
- if (nameSize > 3)
- {
- Name deviceName = originalName.getPartialName(0, nameSize - 3);
- string type = originalName.getCompAsString(nameSize - 3);
- if (type == "action")
+ ObjectDb db (m_dbFolder, hashStr);
+ // may do prefetching
+
+ BytesPtr co = db.fetchSegment (deviceName, segment);
+ if (co)
{
- int64_t seqno = originalName.getCompAsInt(nameSize - 1);
- PcoPtr pco = m_actionLog->LookupActionPco(deviceName, seqno);
- if (pco)
- {
- Bytes content = pco->buf();
- if (m_freshness > 0)
+ if (forwardingHint.size () == 0)
+ {
+ m_ccnx->putToCcnd (*co);
+ }
+ else
+ {
+ if (m_freshness > 0)
+ {
+ m_ccnx->publishData(interest, *co, m_freshness);
+ }
+ else
+ {
+ m_ccnx->publishData(interest, *co);
+ }
+ }
+
+ }
+ else
+ {
+ _LOG_ERROR ("ObjectDd exists, but no segment " << segment << " for device: " << deviceName << ", file_hash: " << hash);
+ }
+ }
+ else
+ {
+ _LOG_ERROR ("ObjectDd doesn't exist for device: " << deviceName << ", file_hash: " << hash);
+ }
+
+}
+
+void
+ContentServer::serve_Action_Execute (Name forwardingHint, Name interest)
+{
+ // /device-name/action/shared-folder/seq
+
+ int64_t seqno = interest.getCompFromBackAsInt (0);
+ Name deviceName = interest.getPartialName (forwardingHint.size (), interest.size () - forwardingHint.size () - 3);
+
+ _LOG_DEBUG (" server ACTION for device: " << deviceName << " and seqno: " << seqno);
+
+ PcoPtr pco = m_actionLog->LookupActionPco (deviceName, seqno);
+ if (pco)
+ {
+ if (forwardingHint.size () == 0)
+ {
+ m_ccnx->putToCcnd (pco->buf ());
+ }
+ else
+ {
+ const Bytes &content = pco->buf ();
+ if (m_freshness > 0)
{
m_ccnx->publishData(interest, content, m_freshness);
}
- else
+ else
{
m_ccnx->publishData(interest, content);
}
- }
}
- else if (type == "file")
- {
- Bytes hashBytes = originalName.getComp(nameSize - 2);
- Hash hash(head(hashBytes), hashBytes.size());
- ostringstream oss;
- oss << hash;
- int64_t segment = originalName.getCompAsInt(nameSize - 1);
- ObjectDb db(m_dbFolder, oss.str());
- BytesPtr co = db.fetchSegment(deviceName, segment);
- if (co)
- {
- if (m_freshness > 0)
- {
- m_ccnx->publishData(interest, *co, m_freshness);
- }
- else
- {
- m_ccnx->publishData(interest, *co);
- }
- }
- }
- else
- {
- cerr << "Discard interest that ContentServer does not know how to handle: " << interest << ", prefix: " << prefix << endl;
- }
-
- }
}
- }
+ else
+ {
+ _LOG_ERROR ("ACTION not found for device: " << deviceName << " and seqno: " << seqno);
+ }
}
diff --git a/src/content-server.h b/src/content-server.h
index 994f8b9..946170d 100644
--- a/src/content-server.h
+++ b/src/content-server.h
@@ -1,5 +1,3 @@
-#ifndef CONTENT_SERVER_H
-#define CONTENT_SERVER_H
/* -*- Mode: C++; c-file-style: "gnu"; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2013 University of California, Los Angeles
@@ -21,17 +19,22 @@
* Alexander Afanasyev <alexander.afanasyev@ucla.edu>
*/
+#ifndef CONTENT_SERVER_H
+#define CONTENT_SERVER_H
+
#include "ccnx-wrapper.h"
#include "object-db.h"
#include "action-log.h"
#include <set>
#include <boost/thread/shared_mutex.hpp>
#include <boost/thread/locks.hpp>
+#include "executor.h"
class ContentServer
{
public:
- ContentServer(Ccnx::CcnxWrapperPtr ccnx, ActionLogPtr actionLog, const boost::filesystem::path &rootDir, int freshness = -1);
+ ContentServer(Ccnx::CcnxWrapperPtr ccnx, ActionLogPtr actionLog, const boost::filesystem::path &rootDir,
+ const Ccnx::Name &deviceName, const std::string &sharedFolderName, int freshness = -1);
~ContentServer();
// the assumption is, when the interest comes in, interest is informs of
@@ -41,19 +44,34 @@
void registerPrefix(const Ccnx::Name &prefix);
void deregisterPrefix(const Ccnx::Name &prefix);
+private:
void
- serve(const Ccnx::Name &interest);
+ serve_Action (Ccnx::Name forwardingHint, const Ccnx::Name &interest);
+
+ void
+ serve_File (Ccnx::Name forwardingHint, const Ccnx::Name &interest);
+
+ void
+ serve_Action_Execute(Ccnx::Name forwardingHint, Ccnx::Name interest);
+
+ void
+ serve_File_Execute(Ccnx::Name forwardingHint, Ccnx::Name interest);
private:
Ccnx::CcnxWrapperPtr m_ccnx;
ActionLogPtr m_actionLog;
typedef boost::shared_mutex Mutex;
- typedef boost::unique_lock<Mutex> WriteLock;
- typedef boost::shared_lock<Mutex> ReadLock;
+
+ typedef boost::unique_lock<Mutex> ScopedLock;
typedef std::set<Ccnx::Name>::iterator PrefixIt;
std::set<Ccnx::Name> m_prefixes;
Mutex m_mutex;
boost::filesystem::path m_dbFolder;
int m_freshness;
+
+ Executor m_executor;
+
+ Ccnx::Name m_deviceName;
+ std::string m_sharedFolderName;
};
#endif // CONTENT_SERVER_H
diff --git a/src/dispatcher.cc b/src/dispatcher.cc
index 63e0937..556144f 100644
--- a/src/dispatcher.cc
+++ b/src/dispatcher.cc
@@ -58,7 +58,7 @@
bind (&Dispatcher::Did_ActionLog_ActionApply_Delete, this, _1));
Name syncPrefix = Name(BROADCAST_DOMAIN)(sharedFolder);
- m_server = new ContentServer(m_ccnx, m_actionLog, rootDir);
+ m_server = new ContentServer(m_ccnx, m_actionLog, rootDir, m_localUserName, m_sharedFolder);
m_server->registerPrefix(Name ("/"));
m_server->registerPrefix(syncPrefix);
@@ -223,24 +223,24 @@
{
uint64_t oldSeq = state.old_seq();
uint64_t newSeq = state.seq();
- Name userName = state.name();
+ Name userName (reinterpret_cast<const unsigned char *> (state.name ().c_str ()), state.name ().size ());
// fetch actions with oldSeq + 1 to newSeq (inclusive)
Name actionNameBase = Name(userName)("action")(m_sharedFolder);
m_actionFetcher->Enqueue (userName, actionNameBase,
bind (&Dispatcher::Did_FetchManager_ActionFetch, this, _1, _2, _3, _4), FetchManager::FinishCallback (),
- oldSeq + 1, newSeq, FetchManager::PRIORITY_HIGH);
+ std::max<uint64_t> (oldSeq + 1, 1), newSeq, FetchManager::PRIORITY_HIGH);
}
}
}
void
-Dispatcher::Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionName, uint32_t seqno, Ccnx::PcoPtr actionPco)
+Dispatcher::Did_FetchManager_ActionFetch (const Ccnx::Name &deviceName, const Ccnx::Name &actionBaseName, uint32_t seqno, Ccnx::PcoPtr actionPco)
{
/// @todo Errors and exception checking
- _LOG_DEBUG ("Received action deviceName: " << deviceName << ", actionName: " << actionName << ", seqno: " << seqno);
+ _LOG_DEBUG ("Received action deviceName: " << deviceName << ", actionBaseName: " << actionBaseName << ", seqno: " << seqno);
ActionItemPtr action = m_actionLog->AddRemoteAction (deviceName, seqno, actionPco);
// trigger may invoke Did_ActionLog_ActionApply_Delete or Did_ActionLog_ActionApply_AddOrModify callbacks
@@ -253,6 +253,7 @@
if (m_objectDbMap.find (hash) == m_objectDbMap.end ())
{
+ _LOG_DEBUG ("create ObjectDb for " << hash);
m_objectDbMap [hash] = make_shared<ObjectDb> (m_rootDir, lexical_cast<string> (hash));
}
@@ -284,18 +285,20 @@
}
void
-Dispatcher::Did_FetchManager_FileSegmentFetch (const Ccnx::Name &deviceName, const Ccnx::Name &fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
+Dispatcher::Did_FetchManager_FileSegmentFetch (const Ccnx::Name &deviceName, const Ccnx::Name &fileSegmentBaseName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
{
- m_executor.execute (bind (&Dispatcher::Did_FetchManager_FileSegmentFetch_Execute, this, deviceName, fileSegmentName, segment, fileSegmentPco));
+ m_executor.execute (bind (&Dispatcher::Did_FetchManager_FileSegmentFetch_Execute, this, deviceName, fileSegmentBaseName, segment, fileSegmentPco));
}
void
-Dispatcher::Did_FetchManager_FileSegmentFetch_Execute (Ccnx::Name deviceName, Ccnx::Name fileSegmentName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
+Dispatcher::Did_FetchManager_FileSegmentFetch_Execute (Ccnx::Name deviceName, Ccnx::Name fileSegmentBaseName, uint32_t segment, Ccnx::PcoPtr fileSegmentPco)
{
- const Bytes &hashBytes = fileSegmentName.getCompFromBack (1);
+ const Bytes &hashBytes = fileSegmentBaseName.getCompFromBack (0);
Hash hash (head(hashBytes), hashBytes.size());
- _LOG_DEBUG ("Received segment deviceName: " << deviceName << ", segmentName: " << fileSegmentName << ", segment: " << segment);
+ _LOG_DEBUG ("Received segment deviceName: " << deviceName << ", segmentBaseName: " << fileSegmentBaseName << ", segment: " << segment);
+
+ _LOG_DEBUG ("Looking up objectdb for " << hash);
map<Hash, ObjectDbPtr>::iterator db = m_objectDbMap.find (hash);
if (db != m_objectDbMap.end())
@@ -304,7 +307,7 @@
}
else
{
- _LOG_ERROR ("no db available for this content object: " << fileSegmentName << ", size: " << fileSegmentPco->buf ().size());
+ _LOG_ERROR ("no db available for this content object: " << fileSegmentBaseName << ", size: " << fileSegmentPco->buf ().size());
}
}
@@ -319,7 +322,7 @@
{
_LOG_DEBUG ("Finished fetching " << deviceName << ", fileBaseName: " << fileBaseName);
- const Bytes &hashBytes = fileBaseName.getCompFromBack (1);
+ const Bytes &hashBytes = fileBaseName.getCompFromBack (0);
Hash hash (head (hashBytes), hashBytes.size ());
FileItemsPtr filesToAssemble = m_actionLog->LookupFilesForHash (hash);
diff --git a/src/executor.cc b/src/executor.cc
index 15fbb5b..b1d3593 100644
--- a/src/executor.cc
+++ b/src/executor.cc
@@ -15,8 +15,8 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
- * Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
*/
#include "executor.h"
diff --git a/src/executor.h b/src/executor.h
index 8d3c021..54a0108 100644
--- a/src/executor.h
+++ b/src/executor.h
@@ -15,8 +15,8 @@
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*
- * Zhenkai Zhu <zhenkai@cs.ucla.edu>
- * Author: Alexander Afanasyev <alexander.afanasyev@ucla.edu>
+ * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
+ * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
*/
#ifndef EXECUTOR_H
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index bbdd6a8..d30bd18 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -73,8 +73,9 @@
finishCallback,
bind (&FetchManager::DidFetchComplete, this, _1),
bind (&FetchManager::DidNoDataTimeout, this, _1),
- deviceName, baseName, minSeqNo, maxSeqNo));
- fetcher.SetForwardingHint (forwardingHint);
+ deviceName, baseName, minSeqNo, maxSeqNo,
+ boost::posix_time::seconds (30),
+ forwardingHint));
switch (priority)
{
diff --git a/src/fetcher.cc b/src/fetcher.cc
index f577b14..e9dec91 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -55,8 +55,8 @@
, m_forwardingHint (forwardingHint)
, m_maximumNoActivityPeriod (timeout)
- , m_minSendSeqNo (-1)
- , m_maxInOrderRecvSeqNo (-1)
+ , m_minSendSeqNo (minSeqNo-1)
+ , m_maxInOrderRecvSeqNo (minSeqNo-1)
, m_minSeqNo (minSeqNo)
, m_maxSeqNo (maxSeqNo)
@@ -114,14 +114,20 @@
if (m_forwardingHint == Name ())
{
// invoke callback
- m_segmentCallback (m_deviceName, m_name, seqno, data);
+ if (!m_segmentCallback.empty ())
+ {
+ m_segmentCallback (m_deviceName, m_name, seqno, data);
+ }
// we don't have to tell FetchManager about this
}
else
{
try {
PcoPtr pco = make_shared<ParsedContentObject> (*data->contentPtr ());
- m_segmentCallback (m_deviceName, m_name, seqno, pco);
+ if (!m_segmentCallback.empty ())
+ {
+ m_segmentCallback (m_deviceName, m_name, seqno, pco);
+ }
}
catch (MisformedContentObjectException &e)
{
@@ -154,7 +160,11 @@
{
m_active = false;
// invoke callback
- m_finishCallback(m_deviceName, m_name);
+ if (!m_finishCallback.empty ())
+ {
+ m_finishCallback(m_deviceName, m_name);
+ }
+
// tell FetchManager that we have finish our job
m_onFetchComplete (*this);
}