update FetchManager and Fetcher
diff --git a/src/fetch-manager.cc b/src/fetch-manager.cc
index 7df2687..84cee4c 100644
--- a/src/fetch-manager.cc
+++ b/src/fetch-manager.cc
@@ -31,9 +31,9 @@
//The disposer object function
struct fetcher_disposer { void operator() (Fetcher *delete_this) { delete delete_this; } };
-FetchManager::FetchManager (CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches/* = 3*/)
+FetchManager::FetchManager (CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches/* = 3*/)
: m_ccnx (ccnx)
- , m_sync (sync)
+ , m_mapping (mapping)
, m_maxParallelFetches (parallelFetches)
, m_currentParallelFetches (0)
@@ -51,22 +51,20 @@
}
void
-FetchManager::Enqueue (const Ccnx::Name &deviceName, uint32_t minSeqNo, uint32_t maxSeqNo, int priority/*=PRIORITY_NORMAL*/)
+FetchManager::Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority/*PRIORITY_NORMAL*/)
{
// we may need to guarantee that LookupLocator will gives an answer and not throw exception...
Name forwardingHint;
- try {
- forwardingHint = m_sync->LookupLocator (deviceName);
- }
- catch (Error::Db &exception) {
- // just ignore for now
- }
+ forwardingHint = m_mapping (deviceName);
Fetcher &fetcher = *(new Fetcher (m_ccnx,
- bind (&FetchManager::DidDataSegmentFetched, this, _1, _2, _3, _4, _5),
+ segmentCallback,
+ finishCallback,
bind (&FetchManager::DidFetchComplete, this, _1),
bind (&FetchManager::DidNoDataTimeout, this, _1),
- deviceName, minSeqNo, maxSeqNo
+ deviceName, baseName, minSeqNo, maxSeqNo
/* Alex: should or should not include hint initially?*/));
switch (priority)
@@ -102,13 +100,6 @@
}
void
-FetchManager::DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
- const Ccnx::Name &name, Ccnx::PcoPtr data)
-{
- // do something
-}
-
-void
FetchManager::DidNoDataTimeout (Fetcher &fetcher)
{
fetcher.SetForwardingHint (Ccnx::Name ("/ndn/broadcast"));
@@ -130,5 +121,4 @@
m_fetchList.erase_and_dispose (FetchList::s_iterator_to (fetcher), fetcher_disposer ());
}
- // ? do something else
}
diff --git a/src/fetch-manager.h b/src/fetch-manager.h
index cfceef5..9e3ff40 100644
--- a/src/fetch-manager.h
+++ b/src/fetch-manager.h
@@ -24,13 +24,12 @@
#include <boost/exception/all.hpp>
#include <boost/shared_ptr.hpp>
+#include <boost/function.hpp>
#include <string>
#include <list>
#include <stdint.h>
#include "scheduler.h"
-
#include "ccnx-wrapper.h"
-#include "sync-log.h"
#include "fetcher.h"
@@ -43,22 +42,29 @@
};
public:
- FetchManager (Ccnx::CcnxWrapperPtr ccnx, SyncLogPtr sync, uint32_t parallelFetches = 3);
+ typedef boost::function<Ccnx::Name(const Ccnx::Name &)> Mapping;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
+ FetchManager (Ccnx::CcnxWrapperPtr ccnx, const Mapping &mapping, uint64_t parallelFetches = 3);
virtual ~FetchManager ();
void
- Enqueue (const Ccnx::Name &deviceName,
- uint32_t minSeqNo, uint32_t maxSeqNo, int priority=PRIORITY_NORMAL);
+ Enqueue (const Ccnx::Name &deviceName, const Ccnx::Name &baseName,
+ const SegmentCallback &segmentCallback, const FinishCallback &finishCallback,
+ uint64_t minSeqNo, uint64_t maxSeqNo, int priority=PRIORITY_NORMAL);
+ // only for Fetcher
inline Ccnx::CcnxWrapperPtr
GetCcnx ();
+private:
+
inline SchedulerPtr
GetScheduler ();
// Fetch Events
void
- DidDataSegmentFetched (Fetcher &fetcher, uint32_t seqno, const Ccnx::Name &basename,
+ DidDataSegmentFetched (Fetcher &fetcher, uint64_t seqno, const Ccnx::Name &basename,
const Ccnx::Name &name, Ccnx::PcoPtr data);
void
@@ -67,19 +73,16 @@
void
DidFetchComplete (Fetcher &fetcher);
-private:
void
ScheduleFetches ();
private:
-
-private:
Ccnx::CcnxWrapperPtr m_ccnx;
- SyncLogPtr m_sync; // to access forwarding hints
+ Mapping m_mapping;
SchedulerPtr m_scheduler;
- uint32_t m_maxParallelFetches;
- uint32_t m_currentParallelFetches;
+ uint64_t m_maxParallelFetches;
+ uint64_t m_currentParallelFetches;
boost::mutex m_parellelFetchMutex;
// optimized list structure for fetch queue
diff --git a/src/fetcher.cc b/src/fetcher.cc
index f009819..59a3b35 100644
--- a/src/fetcher.cc
+++ b/src/fetcher.cc
@@ -32,20 +32,23 @@
using namespace std;
using namespace Ccnx;
-Fetcher::Fetcher (CcnxWrapperPtr ccnx,
- OnDataSegmentCallback onDataSegment,
+Fetcher::Fetcher (Ccnx::CcnxWrapperPtr ccnx,
+ const SegmentCallback &segmentCallback,
+ const FinishCallback &finishCallback,
OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
- const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
+ const Ccnx::Name &deviceName, const Ccnx::Name &name, int64_t minSeqNo, int64_t maxSeqNo,
boost::posix_time::time_duration timeout/* = boost::posix_time::seconds (30)*/,
const Ccnx::Name &forwardingHint/* = Ccnx::Name ()*/)
: m_ccnx (ccnx)
- , m_onDataSegment (onDataSegment)
+ , m_segmentCallback (segmentCallback)
, m_onFetchComplete (onFetchComplete)
, m_onFetchFailed (onFetchFailed)
+ , m_finishCallback (finishCallback)
, m_active (false)
, m_name (name)
+ , m_deviceName (deviceName)
, m_forwardingHint (forwardingHint)
, m_maximumNoActivityPeriod (timeout)
@@ -99,15 +102,19 @@
}
void
-Fetcher::OnData (uint32_t seqno, const Ccnx::Name &name, PcoPtr data)
+Fetcher::OnData (uint64_t seqno, const Ccnx::Name &name, PcoPtr data)
{
if (m_forwardingHint == Name ())
- m_onDataSegment (*this, seqno, m_name, name, data);
+ {
+ // invoke callback
+ m_segmentCallback (m_deviceName, m_name, seqno, data);
+ // we don't have to tell FetchManager about this
+ }
else
{
try {
PcoPtr pco = make_shared<ParsedContentObject> (*data->contentPtr ());
- m_onDataSegment (*this, seqno, m_name, pco->name (), pco);
+ m_segmentCallback (m_deviceName, m_name, seqno, pco);
}
catch (MisformedContentObjectException &e)
{
@@ -122,7 +129,7 @@
////////////////////////////////////////////////////////////////////////////
m_outOfOrderRecvSeqNo.insert (seqno);
- set<int32_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
+ set<int64_t>::iterator inOrderSeqNo = m_outOfOrderRecvSeqNo.begin ();
for (; inOrderSeqNo != m_outOfOrderRecvSeqNo.end ();
inOrderSeqNo++)
{
@@ -139,6 +146,9 @@
if (m_maxInOrderRecvSeqNo == m_maxSeqNo)
{
m_active = false;
+ // invoke callback
+ m_finishCallback(m_deviceName, m_name);
+ // tell FetchManager that we have finish our job
m_onFetchComplete (*this);
}
else
@@ -148,7 +158,7 @@
}
Closure::TimeoutCallbackReturnValue
-Fetcher::OnTimeout (uint32_t seqno, const Ccnx::Name &name)
+Fetcher::OnTimeout (uint64_t seqno, const Ccnx::Name &name)
{
// cout << "Fetcher::OnTimeout: " << name << endl;
// cout << "Last: " << m_lastPositiveActivity << ", config: " << m_maximumNoActivityPeriod
diff --git a/src/fetcher.h b/src/fetcher.h
index 12ad67f..9fba688 100644
--- a/src/fetcher.h
+++ b/src/fetcher.h
@@ -34,15 +34,16 @@
class Fetcher
{
public:
- typedef boost::function<void (Fetcher &, uint32_t /*requested seqno*/, const Ccnx::Name & /*requested base name*/,
- const Ccnx::Name & /*actual name*/, Ccnx::PcoPtr /*content object*/)> OnDataSegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName, uint64_t seq, Ccnx::PcoPtr pco)> SegmentCallback;
+ typedef boost::function<void(Ccnx::Name &deviceName, Ccnx::Name &baseName)> FinishCallback;
typedef boost::function<void (Fetcher &)> OnFetchCompleteCallback;
typedef boost::function<void (Fetcher &)> OnFetchFailedCallback;
Fetcher (Ccnx::CcnxWrapperPtr ccnx,
- OnDataSegmentCallback onDataSegment,
- OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed,
- const Ccnx::Name &name, int32_t minSeqNo, int32_t maxSeqNo,
+ const SegmentCallback &segmentCallback, // callback passed by caller of FetchManager
+ const FinishCallback &finishCallback, // callback passed by caller of FetchManager
+ OnFetchCompleteCallback onFetchComplete, OnFetchFailedCallback onFetchFailed, // callbacks provided by FetchManager
+ const Ccnx::Name &deviceName, const Ccnx::Name &name, int64_t minSeqNo, int64_t maxSeqNo,
boost::posix_time::time_duration timeout = boost::posix_time::seconds (30), // this time is not precise, but sets min bound
// actual time depends on how fast Interests timeout
const Ccnx::Name &forwardingHint = Ccnx::Name ());
@@ -62,10 +63,10 @@
FillPipeline ();
void
- OnData (uint32_t seqno, const Ccnx::Name &name, Ccnx::PcoPtr data);
+ OnData (uint64_t seqno, const Ccnx::Name &name, Ccnx::PcoPtr data);
Ccnx::Closure::TimeoutCallbackReturnValue
- OnTimeout (uint32_t seqno, const Ccnx::Name &name);
+ OnTimeout (uint64_t seqno, const Ccnx::Name &name);
public:
boost::intrusive::list_member_hook<> m_managerListHook;
@@ -73,26 +74,29 @@
private:
Ccnx::CcnxWrapperPtr m_ccnx;
- OnDataSegmentCallback m_onDataSegment;
+ SegmentCallback m_segmentCallback;
OnFetchCompleteCallback m_onFetchComplete;
OnFetchFailedCallback m_onFetchFailed;
+ FinishCallback m_finishCallback;
+
bool m_active;
Ccnx::Name m_name;
+ Ccnx::Name m_deviceName;
Ccnx::Name m_forwardingHint;
boost::posix_time::time_duration m_maximumNoActivityPeriod;
- int32_t m_minSendSeqNo;
- int32_t m_maxInOrderRecvSeqNo;
- std::set<int32_t> m_outOfOrderRecvSeqNo;
+ int64_t m_minSendSeqNo;
+ int64_t m_maxInOrderRecvSeqNo;
+ std::set<int64_t> m_outOfOrderRecvSeqNo;
- int32_t m_minSeqNo;
- int32_t m_maxSeqNo;
+ int64_t m_minSeqNo;
+ int64_t m_maxSeqNo;
- uint32_t m_pipeline;
- uint32_t m_activePipeline;
+ uint64_t m_pipeline;
+ uint64_t m_activePipeline;
boost::posix_time::ptime m_lastPositiveActivity;