API change for closure. Return ParsedContentObject, not just content
Fixed bug in test/test-sync-core.cc: there should have been two independent schedulers
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index cb94992..7df2687 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -103,7 +103,7 @@
void
FetchManager::DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
- const Ccnx::Name &name, const Bytes &data)
+ const Ccnx::Name &name, Ccnx::PcoPtr data)
{
// do something
}
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index 496d0fd..cfceef5 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -30,7 +30,6 @@
#include "scheduler.h"
#include "ccnx-wrapper.h"
-#include "ccnx-tunnel.h"
#include "sync-log.h"
#include "fetcher.h"
@@ -60,7 +59,7 @@
// Fetch Events
void
DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
- const Ccnx::Name &name, const Ccnx::Bytes &data);
+ const Ccnx::Name &name, Ccnx::PcoPtr data);
void
DidNoDataTimeout (Fetcher &fetcher);
diff --git a/src/fetcher.cc b/src/fetcher.cc
index edbabb4..f009819 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -99,15 +99,15 @@
}
void
-Fetcher::OnData (uint32_t seqno, const Ccnx::Name &name, const Ccnx::Bytes &content)
+Fetcher::OnData (uint32_t seqno, const Ccnx::Name &name, PcoPtr data)
{
if (m_forwardingHint == Name ())
- m_onDataSegment (*this, seqno, m_name, name, content);
+ m_onDataSegment (*this, seqno, m_name, name, data);
else
{
try {
- ParsedContentObject pco (content);
- m_onDataSegment (*this, seqno, m_name, pco.name (), pco.content ());
+ PcoPtr pco = make_shared<ParsedContentObject> (*data->contentPtr ());
+ m_onDataSegment (*this, seqno, m_name, pco->name (), pco);
}
catch (MisformedContentObjectException &e)
{
diff --git a/src/fetcher.h b/src/fetcher.h
index 271db52..12ad67f 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -35,7 +35,7 @@
{
public:
typedef boost::function<void (Fetcher &, uint32_t /*requested seqno*/, const Ccnx::Name & /*requested base name*/,
- const Ccnx::Name & /*actual name*/, const Ccnx::Bytes &)> OnDataSegmentCallback;
+ const Ccnx::Name & /*actual name*/, Ccnx::PcoPtr /*content object*/)> OnDataSegmentCallback;
typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
@@ -62,7 +62,7 @@
FillPipeline ();
void
- OnData (uint32_t seqno, const Ccnx::Name &name, const Ccnx::Bytes &);
+ OnData (uint32_t seqno, const Ccnx::Name &name, Ccnx::PcoPtr data);
Ccnx::Closure::TimeoutCallbackReturnValue
OnTimeout (uint32_t seqno, const Ccnx::Name &name);
diff --git a/src/sync-core.cc b/src/sync-core.cc
index 14f4110..9861c44 100644
--- a/src/sync-core.cc
+++ b/src/sync-core.cc
@@ -32,7 +32,7 @@
static void
printMsg(SyncStateMsgPtr &msg)
{
- cout << " ===== start Msg ======" << endl;
+ cerr << " ===== start Msg ======" << endl;
int size = msg->state_size();
if (size > 0)
{
@@ -43,17 +43,17 @@
string strName = state.name();
string strLocator = state.locator();
sqlite3_int64 seq = state.seq();
- cout << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size());
- cout << ", Locator: " << Name((const unsigned char *)strLocator.c_str(), strLocator.size());
- cout << ", seq: " << seq << endl;
+ cerr << "Name: " << Name((const unsigned char *)strName.c_str(), strName.size());
+ cerr << ", Locator: " << Name((const unsigned char *)strLocator.c_str(), strLocator.size());
+ cerr << ", seq: " << seq << endl;
index ++;
}
}
else
{
- cout << "Msg size 0" << endl;
+ cerr << "Msg size 0" << endl;
}
- cout << " ++++++++ end Msg ++++++++ \n\n" << endl;
+ cerr << " ++++++++ end Msg ++++++++ \n\n" << endl;
}
SyncCore::SyncCore(SyncLogPtr syncLog, const Name &userName, const Name &localPrefix, const Name &syncPrefix, const StateMsgCallback &callback, const CcnxWrapperPtr &handle, const SchedulerPtr &scheduler)
@@ -128,7 +128,7 @@
Bytes syncData;
msgToBytes(msg, syncData);
m_handle->publishData(syncName, syncData, FRESHNESS);
- cout << m_log->GetLocalName () << " publishes: " << *oldHash << endl;
+ cerr << m_log->GetLocalName () << " publishes: " << *oldHash << endl;
printMsg(msg);
// no hurry in sending out new Sync Interest; if others send the new Sync Interest first, no problem, we know the new root hash already;
@@ -172,7 +172,7 @@
assert(msg->state_size() > 0);
int size = msg->state_size();
int index = 0;
- cout << "Reply recover interest with: " << endl;
+ cerr << "Reply recover interest with: " << endl;
while (index < size)
{
SyncState state = msg->state(index);
@@ -194,7 +194,7 @@
Bytes syncData;
msgToBytes(msg, syncData);
m_handle->publishData(name, syncData, FRESHNESS);
- cout << m_log->GetLocalName () << " publishes " << hash << endl;
+ cerr << m_log->GetLocalName () << " publishes " << hash << endl;
printMsg(msg);
}
else
@@ -211,29 +211,29 @@
if (*hash == *m_rootHash)
{
// we have the same hash; nothing needs to be done
- cout << "same as root hash: " << *hash << endl;
+ cerr << "same as root hash: " << *hash << endl;
return;
}
else if (m_log->LookupSyncLog(*hash) > 0)
{
// we know something more
- cout << "found hash in sync log" << endl;
+ cerr << "found hash in sync log" << endl;
SyncStateMsgPtr msg = m_log->FindStateDifferences(*hash, *m_rootHash);
Bytes syncData;
msgToBytes(msg, syncData);
m_handle->publishData(name, syncData, FRESHNESS);
- cout << m_log->GetLocalName () << " publishes: " << *hash << endl;
+ cerr << m_log->GetLocalName () << " publishes: " << *hash << endl;
printMsg(msg);
}
else
{
// we don't recognize the hash, send recover Interest if still don't know the hash after a randomized wait period
ostringstream ss;
- ss << *hash;
+ ss << "r-" << *hash;
double wait = m_recoverWaitGenerator->nextInterval();
- cout << m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash << endl;
- cout << "recover task scheduled after wait: " << wait << endl;
+ cerr << m_log->GetLocalName () << ", rootHash: " << *m_rootHash << ", hash: " << *hash << endl;
+ cerr << "recover task scheduled after wait: " << wait << endl;
TaskPtr task(new OneTimeTask(boost::bind(&SyncCore::recover, this, hash), ss.str(), m_scheduler, wait));
m_scheduler->addTask(task);
@@ -259,18 +259,18 @@
}
void
-SyncCore::handleRecoverData(const Name &name, const Bytes &content)
+SyncCore::handleRecoverData(const Name &name, PcoPtr content)
{
//cout << "handle recover data" << endl;
- handleStateData(content);
+ handleStateData(*content->contentPtr ());
sendSyncInterest();
}
void
-SyncCore::handleSyncData(const Name &name, const Bytes &content)
+SyncCore::handleSyncData(const Name &name, PcoPtr content)
{
// suppress recover in interest - data out of order case
- handleStateData(content);
+ handleStateData(*content->contentPtr ());
// resume outstanding sync interest
sendSyncInterest();
@@ -288,7 +288,7 @@
return;
}
- cout << m_log->GetLocalName () << " receives Msg " << endl;
+ cerr << m_log->GetLocalName () << " receives Msg " << endl;
printMsg (msg);
int size = msg->state_size();
int index = 0;
@@ -311,12 +311,12 @@
m_log->UpdateLocator(deviceName, locatorName);
// WriteLock lock(m_ypMutex);
// m_yp[deviceName] = locatorName;
- cout << "self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName << endl;
+ cerr << "self: " << m_log->GetLocalName () << ", device: " << deviceName << " < == > " << locatorName << endl;
}
}
else
{
- cout << "nani" << endl;
+ cerr << "nani" << endl;
deregister(deviceName);
}
index++;
@@ -339,7 +339,7 @@
{
Name syncInterest = constructSyncName(m_rootHash);
m_handle->sendInterest(syncInterest, m_syncClosure);
- cout << m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash << endl;
+ cerr << m_log->GetLocalName () << " send SYNC interest: " << *m_rootHash << endl;
}
void
@@ -347,7 +347,7 @@
{
if (!(*hash == *m_rootHash) && m_log->LookupSyncLog(*hash) <= 0)
{
- cout << m_log->GetLocalName () << ", Recover for: " << *hash << endl;
+ cerr << m_log->GetLocalName () << ", Recover for: " << *hash << endl;
// unfortunately we still don't recognize this hash
Bytes bytes;
readRaw(bytes, (const unsigned char *)hash->GetHash(), hash->GetHashBytes());
@@ -356,7 +356,7 @@
// append the unknown hash
recoverInterest.appendComp(bytes);
m_handle->sendInterest(recoverInterest, m_recoverClosure);
- cout << m_log->GetLocalName () << " send RECOVER Interest: " << *hash << endl;
+ cerr << m_log->GetLocalName () << " send RECOVER Interest: " << *hash << endl;
}
else
{
diff --git a/src/sync-core.h b/src/sync-core.h
index 7d46ccb..f77b133 100644
--- a/src/sync-core.h
+++ b/src/sync-core.h
@@ -72,10 +72,10 @@
handleInterest(const Name &name);
void
- handleSyncData(const Name &name, const Bytes &content);
+ handleSyncData(const Name &name, Ccnx::PcoPtr content);
void
- handleRecoverData(const Name &name, const Bytes &content);
+ handleRecoverData(const Name &name, Ccnx::PcoPtr content);
Closure::TimeoutCallbackReturnValue
handleSyncInterestTimeout(const Name &name);