update FetchManager and Fetcher
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 7df2687..84cee4c 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -31,9 +31,9 @@
//The disposer object function
struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches/* = 3*/)
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches/* = 3*/)
: m_ccnx (ccnx)
- , m_sync (sync)
+ , m_mapping (mapping)
, m_maxParallelFetches (parallelFetches)
, m_currentParallelFetches (0)
@@ -51,22 +51,20 @@
}
void
-FetchManager::Enqueue (const Ccnx::Name &deviceName, uint32_t minSeqNo, uint32_t maxSeqNo, int priority/*=PRIORITY_NORMAL*/)
+FetchManager::Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority/*PRIORITY_NORMAL*/)
{
// we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Name forwardingHint;
- try {
- forwardingHint = m_sync->LookupLocator (deviceName);
- }
- catch (Error::Db &exception) {
- // just ignore for now
- }
+ forwardingHint = m_mapping (deviceName);
Fetcher &fetcher = *(new Fetcher (m_ccnx,
- bind (&FetchManager::DidDataSegmentFetched, this, _1, _2, _3, _4, _5),
+ segmentCallback,
+ finishCallback,
bind (&FetchManager::DidFetchComplete, this, _1),
bind (&FetchManager::DidNoDataTimeout, this, _1),
- deviceName, minSeqNo, maxSeqNo
+ deviceName, baseName, minSeqNo, maxSeqNo
/* Alex: should or should not include hint initially?*/));
switch (priority)
@@ -102,13 +100,6 @@
}
void
-FetchManager::DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
- const Ccnx::Name &name, Ccnx::PcoPtr data)
-{
- // do something
-}
-
-void
FetchManager::DidNoDataTimeout (Fetcher &fetcher)
{
fetcher.SetForwardingHint (Ccnx::Name ("/ndn/broadcast"));
@@ -130,5 +121,4 @@
m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
}
- // ? do something else
}
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index cfceef5..9e3ff40 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -24,13 +24,12 @@
#include <boost/exception/all.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
#include <string>
#include <list>
#include <stdint.h>
#include "scheduler.h"
-
#include "ccnx-wrapper.h"
-#include "sync-log.h"
#include "fetcher.h"
@@ -43,22 +42,29 @@
};
public:
- FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches = 3);
+ typedef boost::function<Ccnx::Name(const Ccnx::Name &)> Mapping;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
+ FetchManager (Ccnx::CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches = 3);
virtual ~FetchManager ();
void
- Enqueue (const Ccnx::Name &deviceName,
- uint32_t minSeqNo, uint32_t maxSeqNo, int priority=PRIORITY_NORMAL);
+ Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority=PRIORITY_NORMAL);
+ // only for Fetcher
inline Ccnx::CcnxWrapperPtr
GetCcnx ();
+private:
+
inline SchedulerPtr
GetScheduler ();
// Fetch Events
void
- DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
+ DidDataSegmentFetched (Fetcher &fetcher, uint64_t seqno, const Ccnx::Name &basename,
const Ccnx::Name &name, Ccnx::PcoPtr data);
void
@@ -67,19 +73,16 @@
void
DidFetchComplete (Fetcher &fetcher);
-private:
void
ScheduleFetches ();
private:
-
-private:
Ccnx::CcnxWrapperPtr m_ccnx;
- SyncLogPtr m_sync; // to access forwarding hints
+ Mapping m_mapping;
SchedulerPtr m_scheduler;
- uint32_t m_maxParallelFetches;
- uint32_t m_currentParallelFetches;
+ uint64_t m_maxParallelFetches;
+ uint64_t m_currentParallelFetches;
boost::mutex m_parellelFetchMutex;
// optimized list structure for fetch queue
diff --git a/src/fetcher.cc b/src/fetcher.cc
index f009819..59a3b35 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -32,20 +32,23 @@
using namespace std;
using namespace Ccnx;
-Fetcher::Fetcher (CcnxWrapperPtr ccnx,
- OnDataSegmentCallback onDataSegment,
+Fetcher::Fetcher (Ccnx::CcnxWrapperPtr ccnx,
+ const SegmentCallback &segmentCallback,
+ const FinishCallback &finishCallback,
OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
- const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
+ const Ccnx::Name &deviceName, const Ccnx::Name &name, int64_t minSeqNo, int64_t maxSeqNo,
boost::posix_time::time_duration timeout/* = boost::posix_time::seconds (30)*/,
const Ccnx::Name &forwardingHint/* = Ccnx::Name ()*/)
: m_ccnx (ccnx)
- , m_onDataSegment (onDataSegment)
+ , m_segmentCallback (segmentCallback)
, m_onFetchComplete (onFetchComplete)
, m_onFetchFailed (onFetchFailed)
+ , m_finishCallback (finishCallback)
, m_active (false)
, m_name (name)
+ , m_deviceName (deviceName)
, m_forwardingHint (forwardingHint)
, m_maximumNoActivityPeriod (timeout)
@@ -99,15 +102,19 @@
}
void
-Fetcher::OnData (uint32_t seqno, const Ccnx::Name &name, PcoPtr data)
+Fetcher::OnData (uint64_t seqno, const Ccnx::Name &name, PcoPtr data)
{
if (m_forwardingHint == Name ())
- m_onDataSegment (*this, seqno, m_name, name, data);
+ {
+ // invoke callback
+ m_segmentCallback (m_deviceName, m_name, seqno, data);
+ // we don't have to tell FetchManager about this
+ }
else
{
try {
PcoPtr pco = make_shared<ParsedContentObject> (*data->contentPtr ());
- m_onDataSegment (*this, seqno, m_name, pco->name (), pco);
+ m_segmentCallback (m_deviceName, m_name, seqno, pco);
}
catch (MisformedContentObjectException &e)
{
@@ -122,7 +129,7 @@
////////////////////////////////////////////////////////////////////////////
m_outOfOrderRecvSeqNo.insert (seqno);
- set<int32_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
+ set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end ();
inOrderSeqNo++)
{
@@ -139,6 +146,9 @@
if (m_maxInOrderRecvSeqNo == m_maxSeqNo)
{
m_active = false;
+ // invoke callback
+ m_finishCallback(m_deviceName, m_name);
+ // tell FetchManager that we have finish our job
m_onFetchComplete (*this);
}
else
@@ -148,7 +158,7 @@
}
Closure::TimeoutCallbackReturnValue
-Fetcher::OnTimeout (uint32_t seqno, const Ccnx::Name &name)
+Fetcher::OnTimeout (uint64_t seqno, const Ccnx::Name &name)
{
// cout << "Fetcher::OnTimeout: " << name << endl;
// cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
diff --git a/src/fetcher.h b/src/fetcher.h
index 12ad67f..9fba688 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -34,15 +34,16 @@
class Fetcher
{
public:
- typedef boost::function<void (Fetcher &, uint32_t /*requested seqno*/, const Ccnx::Name & /*requested base name*/,
- const Ccnx::Name & /*actual name*/, Ccnx::PcoPtr /*content object*/)> OnDataSegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
Fetcher (Ccnx::CcnxWrapperPtr ccnx,
- OnDataSegmentCallback onDataSegment,
- OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
- const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
+ const SegmentCallback &segmentCallback, // callback passed by caller of FetchManager
+ const FinishCallback &finishCallback, // callback passed by caller of FetchManager
+ OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed, // callbacks provided by FetchManager
+ const Ccnx::Name &deviceName, const Ccnx::Name &name, int64_t minSeqNo, int64_t maxSeqNo,
boost::posix_time::time_duration timeout = boost::posix_time::seconds (30), // this time is not precise, but sets min bound
// actual time depends on how fast Interests timeout
const Ccnx::Name &forwardingHint = Ccnx::Name ());
@@ -62,10 +63,10 @@
FillPipeline ();
void
- OnData (uint32_t seqno, const Ccnx::Name &name, Ccnx::PcoPtr data);
+ OnData (uint64_t seqno, const Ccnx::Name &name, Ccnx::PcoPtr data);
Ccnx::Closure::TimeoutCallbackReturnValue
- OnTimeout (uint32_t seqno, const Ccnx::Name &name);
+ OnTimeout (uint64_t seqno, const Ccnx::Name &name);
public:
boost::intrusive::list_member_hook<> m_managerListHook;
@@ -73,26 +74,29 @@
private:
Ccnx::CcnxWrapperPtr m_ccnx;
- OnDataSegmentCallback m_onDataSegment;
+ SegmentCallback m_segmentCallback;
OnFetchCompleteCallback m_onFetchComplete;
OnFetchFailedCallback m_onFetchFailed;
+ FinishCallback m_finishCallback;
+
bool m_active;
Ccnx::Name m_name;
+ Ccnx::Name m_deviceName;
Ccnx::Name m_forwardingHint;
boost::posix_time::time_duration m_maximumNoActivityPeriod;
- int32_t m_minSendSeqNo;
- int32_t m_maxInOrderRecvSeqNo;
- std::set<int32_t> m_outOfOrderRecvSeqNo;
+ int64_t m_minSendSeqNo;
+ int64_t m_maxInOrderRecvSeqNo;
+ std::set<int64_t> m_outOfOrderRecvSeqNo;
- int32_t m_minSeqNo;
- int32_t m_maxSeqNo;
+ int64_t m_minSeqNo;
+ int64_t m_maxSeqNo;
- uint32_t m_pipeline;
- uint32_t m_activePipeline;
+ uint64_t m_pipeline;
+ uint64_t m_activePipeline;
boost::posix_time::ptime m_lastPositiveActivity;
diff --git a/test/test-fetch-manager.cc b/test/test-fetch-manager.cc
index 276fe0c..a526c10 100644
--- a/test/test-fetch-manager.cc
+++ b/test/test-fetch-manager.cc
@@ -33,8 +33,8 @@
struct FetcherTestData
{
- set<uint32_t> recvData;
- set<uint32_t> recvContent;
+ set<uint64_t> recvData;
+ set<uint64_t> recvContent;
set<Name> differentNames;
set<Name> segmentNames;
@@ -49,11 +49,12 @@
}
void
- onData (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
- const Ccnx::Name &name, Ccnx::PcoPtr pco)
+ onData (const Ccnx::Name &deviceName, const Ccnx::Name &basename, uint64_t seqno, Ccnx::PcoPtr pco)
{
recvData.insert (seqno);
differentNames.insert (basename);
+ Name name = basename;
+ name.appendComp(seqno);
segmentNames.insert (name);
BytesPtr data = pco->contentPtr ();
@@ -67,6 +68,11 @@
}
void
+ finish(const Ccnx::Name &deviceName, const Ccnx::Name &baseName)
+ {
+ }
+
+ void
onComplete (Fetcher &fetcher)
{
m_done = true;
@@ -87,6 +93,7 @@
CcnxWrapperPtr ccnx = make_shared<CcnxWrapper> ();
Name baseName ("/base");
+ Name deviceName ("/device");
/* publish seqnos: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, <gap 5>, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, <gap 1>, 26 */
// this will allow us to test our pipeline of 6
for (int i = 0; i < 10; i++)
@@ -105,10 +112,11 @@
FetcherTestData data;
Fetcher fetcher (ccnx,
- bind (&FetcherTestData::onData, &data, _1, _2, _3, _4, _5),
+ bind (&FetcherTestData::onData, &data, _1, _2, _3, _4),
+ bind (&FetcherTestData::finish, &data, _1, _2),
bind (&FetcherTestData::onComplete, &data, _1),
bind (&FetcherTestData::onFail, &data, _1),
- Name ("/base"), 0, 26,
+ deviceName, Name ("/base"), 0, 26,
boost::posix_time::seconds (5)); // this time is not precise
BOOST_CHECK_EQUAL (fetcher.IsActive (), false);
@@ -124,11 +132,11 @@
{
ostringstream recvData;
- for (set<uint32_t>::iterator i = data.recvData.begin (); i != data.recvData.end (); i++)
+ for (set<uint64_t>::iterator i = data.recvData.begin (); i != data.recvData.end (); i++)
recvData << *i << ", ";
ostringstream recvContent;
- for (set<uint32_t>::iterator i = data.recvContent.begin (); i != data.recvContent.end (); i++)
+ for (set<uint64_t>::iterator i = data.recvContent.begin (); i != data.recvContent.end (); i++)
recvContent << *i << ", ";
BOOST_CHECK_EQUAL (recvData.str (), "0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, ");
@@ -156,11 +164,11 @@
{
ostringstream recvData;
- for (set<uint32_t>::iterator i = data.recvData.begin (); i != data.recvData.end (); i++)
+ for (set<uint64_t>::iterator i = data.recvData.begin (); i != data.recvData.end (); i++)
recvData << *i << ", ";
ostringstream recvContent;
- for (set<uint32_t>::iterator i = data.recvContent.begin (); i != data.recvContent.end (); i++)
+ for (set<uint64_t>::iterator i = data.recvContent.begin (); i != data.recvContent.end (); i++)
recvContent << *i << ", ";
BOOST_CHECK_EQUAL (recvData.str (), "0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, ");