change CcnxWrapper to support fetching of raw data
diff --git a/ccnx/sync-app-socket.h b/ccnx/sync-app-socket.h
index d17857c..f5e5e0f 100644
--- a/ccnx/sync-app-socket.h
+++ b/ccnx/sync-app-socket.h
@@ -70,6 +70,8 @@
*/
void remove (const std::string &prefix) {m_syncLogic.remove(prefix);}
+ int getSeq();
+
private:
CcnxWrapperPtr m_appHandle;
diff --git a/ccnx/sync-ccnx-wrapper.cc b/ccnx/sync-ccnx-wrapper.cc
index ea0f876..85ffbd6 100644
--- a/ccnx/sync-ccnx-wrapper.cc
+++ b/ccnx/sync-ccnx-wrapper.cc
@@ -235,18 +235,28 @@
ccn_upcall_kind kind,
ccn_upcall_info *info)
{
- CcnxWrapper::DataCallback *f = static_cast<CcnxWrapper::DataCallback*> (selfp->data);
+ //CcnxWrapper::DataCallback *f = static_cast<CcnxWrapper::DataCallback*> (selfp->data);
+ ClosurePass *cp = static_cast<ClosurePass *> (selfp->data);
switch (kind)
{
case CCN_UPCALL_FINAL: // effecitve in unit tests
- delete f;
+ delete cp;
+ cp = NULL;
delete selfp;
return CCN_UPCALL_RESULT_OK;
case CCN_UPCALL_CONTENT:
break;
+ case CCN_UPCALL_INTEREST_TIMED_OUT: {
+ if (cp != NULL && cp->getRetry() > 0) {
+ cp->decRetry();
+ return CCN_UPCALL_RESULT_REEXPRESS;
+ }
+ return CCN_UPCALL_RESULT_OK;
+ }
+
default:
return CCN_UPCALL_RESULT_OK;
}
@@ -255,7 +265,6 @@
size_t len;
if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, (const unsigned char **)&pcontent, &len) < 0)
BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
- string content(pcontent, len);
string name;
for (int i = 0; i < info->content_comps->n - 1; i++)
@@ -267,11 +276,25 @@
string compStr(comp, size);
name += compStr;
}
- (*f) (name, content);
+
+ cp->runCallback(name, pcontent, len);
+
return CCN_UPCALL_RESULT_OK;
}
-int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback)
+int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback, int retry)
+{
+ DataClosurePass * pass = new DataClosurePass(STRING_FORM, retry, dataCallback);
+ sendInterest(strInterest, pass);
+}
+
+int CcnxWrapper::sendInterestForRawData (const string &strInterest, const RawDataCallback &rawDataCallback, int retry)
+{
+ RawDataClosurePass * pass = new RawDataClosurePass(RAW_DATA, retry, rawDataCallback);
+ sendInterest(strInterest, pass);
+}
+
+int CcnxWrapper::sendInterest (const string &strInterest, void *dataPass)
{
recursive_mutex::scoped_lock lock(m_mutex);
if (!m_running)
@@ -282,7 +305,7 @@
ccn_closure *dataClosure = new ccn_closure;
ccn_name_from_uri (pname, strInterest.c_str());
- dataClosure->data = new DataCallback (dataCallback); // should be removed when closure is removed
+ dataClosure->data = dataPass;
dataClosure->p = &incomingData;
if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
@@ -333,4 +356,44 @@
ccn_charbuf_destroy(&pname);
}
+DataClosurePass::DataClosurePass (CallbackType type, int retry, const CcnxWrapper::DataCallback &dataCallback): ClosurePass(type, retry), m_callback(NULL)
+{
+ m_callback = new CcnxWrapper::DataCallback (dataCallback);
+}
+
+DataClosurePass::~DataClosurePass ()
+{
+ delete m_callback;
+ m_callback = NULL;
+}
+
+void
+DataClosurePass::runCallback(std::string name, const char *data, size_t len)
+{
+ string content(data, len);
+ if (m_callback != NULL) {
+ (*m_callback)(name, content);
+ }
+}
+
+
+RawDataClosurePass::RawDataClosurePass (CallbackType type, int retry, const CcnxWrapper::RawDataCallback &rawDataCallback): ClosurePass(type, retry), m_callback(NULL)
+{
+ m_callback = new CcnxWrapper::RawDataCallback (rawDataCallback);
+}
+
+RawDataClosurePass::~RawDataClosurePass ()
+{
+ delete m_callback;
+ m_callback = NULL;
+}
+
+void
+RawDataClosurePass::runCallback(std::string name, const char *data, size_t len)
+{
+ if (m_callback != NULL) {
+ (*m_callback)(name, data, len);
+ }
+}
+
}
diff --git a/ccnx/sync-ccnx-wrapper.h b/ccnx/sync-ccnx-wrapper.h
index 0ccbec4..b8c3c93 100644
--- a/ccnx/sync-ccnx-wrapper.h
+++ b/ccnx/sync-ccnx-wrapper.h
@@ -54,6 +54,7 @@
class CcnxWrapper {
public:
typedef boost::function<void (std::string, std::string)> DataCallback;
+ typedef boost::function<void (std::string, const char *buf, size_t len)> RawDataCallback;
typedef boost::function<void (std::string)> InterestCallback;
/**
@@ -77,7 +78,10 @@
* @return the return code of ccn_express_interest
*/
int
- sendInterest (const std::string &strInterest, const DataCallback &dataCallback);
+ sendInterest (const std::string &strInterest, const DataCallback &dataCallback, int retry = 0);
+
+ int
+ sendInterestForRawData (const std::string &strInterest, const RawDataCallback &rawDataCallback, int retry = 0);
/**
* @brief set Interest filter (specify what interest you want to receive)
@@ -127,6 +131,9 @@
void
ccnLoop ();
+
+ int
+ sendInterest (const std::string &strInterest, void *dataPass);
/// @endcond
private:
ccn* m_handle;
@@ -140,6 +147,40 @@
typedef boost::shared_ptr<CcnxWrapper> CcnxWrapperPtr;
+enum CallbackType { STRING_FORM, RAW_DATA};
+
+class ClosurePass {
+public:
+ ClosurePass(CallbackType type, int retry): m_type(type), m_retry(retry) {}
+ int getRetry() {return m_retry;}
+ void decRetry() { m_retry--;}
+ CallbackType getCallbackType() {return m_type;}
+ virtual ~ClosurePass(){}
+ virtual void runCallback(std::string name, const char *data, size_t len) = 0;
+
+protected:
+ int m_retry;
+ CallbackType m_type;
+};
+
+class DataClosurePass: public ClosurePass {
+public:
+ DataClosurePass(CallbackType type, int retry, const CcnxWrapper::DataCallback &dataCallback);
+ virtual ~DataClosurePass();
+ virtual void runCallback(std::string name, const char *, size_t len);
+private:
+ CcnxWrapper::DataCallback * m_callback;
+};
+
+class RawDataClosurePass: public ClosurePass {
+public:
+ RawDataClosurePass(CallbackType type, int retry, const CcnxWrapper::RawDataCallback &RawDataCallback);
+ virtual ~RawDataClosurePass();
+ virtual void runCallback(std::string name, const char *, size_t len);
+private:
+ CcnxWrapper::RawDataCallback * m_callback;
+};
+
} // Sync
#endif // SYNC_CCNX_WRAPPER_H