blob: c818cd2a32ddb9a58980e3422441f6bd3add0f7b [file] [log] [blame]
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08001#include "ccnx-wrapper.h"
Zhenkai Zhu43eb2732012-12-28 00:48:26 -08002extern "C" {
3#include <ccn/fetch.h>
4}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08005#include <poll.h>
6#include <boost/throw_exception.hpp>
7#include <boost/date_time/posix_time/posix_time.hpp>
8#include <boost/random.hpp>
9#include <sstream>
10
11typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
12typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
13
14using namespace std;
15using namespace boost;
16
17namespace Ccnx {
18
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080019CcnxWrapper::CcnxWrapper()
20 : m_handle (0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080021 , m_running (true)
22 , m_connected (false)
23{
24 connectCcnd();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080025 m_thread = thread (&CcnxWrapper::ccnLoop, this);
26}
27
28void
29CcnxWrapper::connectCcnd()
30{
Zhenkai Zhu3b82d432013-01-03 22:48:40 -080031 //if (m_handle != 0) {
32 //ccn_disconnect (m_handle);
33 //ccn_destroy (&m_handle);
34 //}
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -080035
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080036 m_handle = ccn_create ();
Zhenkai Zhud34106a2013-01-04 15:51:55 -080037 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080038 if (ccn_connect(m_handle, NULL) < 0)
39 {
40 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
41 }
42 m_connected = true;
43
Zhenkai Zhu3b82d432013-01-03 22:48:40 -080044 //if (!m_registeredInterests.empty())
45 //{
46 // for (map<Name, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
47 //{
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080048 // clearInterestFilter(it->first);
Zhenkai Zhu3b82d432013-01-03 22:48:40 -080049 // setInterestFilter(it->first, it->second);
50 //}
51 //}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080052}
53
54CcnxWrapper::~CcnxWrapper()
55{
56 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -080057 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080058 m_running = false;
59 }
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -080060
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080061 m_thread.join ();
62 ccn_disconnect (m_handle);
Zhenkai Zhue29616f2013-01-14 15:40:57 -080063 //ccn_destroy (&m_handle);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080064}
65
66void
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080067CcnxWrapper::ccnLoop ()
68{
69 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
70 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
71
72 while (m_running)
73 {
74 try
75 {
76 int res = 0;
77 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -080078 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080079 res = ccn_run (m_handle, 0);
80 }
81
82 if (!m_running) break;
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -080083
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080084 if (res < 0)
85 BOOST_THROW_EXCEPTION (CcnxOperationException()
86 << errmsg_info_str("ccn_run returned error"));
87
88
89 pollfd pfds[1];
90 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -080091 UniqueRecLock(m_mutex);
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -080092
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080093 pfds[0].fd = ccn_get_connection_fd (m_handle);
94 pfds[0].events = POLLIN;
95 if (ccn_output_is_pending (m_handle))
96 pfds[0].events |= POLLOUT;
97 }
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -080098
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080099 int ret = poll (pfds, 1, 1);
100 if (ret < 0)
101 {
102 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
103 }
104 }
105 catch (CcnxOperationException &e)
106 {
107 // do not try reconnect for now
108 throw e;
109 /*
110 m_connected = false;
111 // probably ccnd has been stopped
112 // try reconnect with sleep
113 int interval = 1;
114 int maxInterval = 32;
115 while (m_running)
116 {
117 try
118 {
119 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
120
121 connectCcnd();
122 _LOG_DEBUG("reconnect to ccnd succeeded");
123 break;
124 }
125 catch (CcnxOperationException &e)
126 {
127 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
128
129 // do exponential backup for reconnect interval
130 if (interval < maxInterval)
131 {
132 interval *= 2;
133 }
134 }
135 }
136 */
137 }
138 catch (const std::exception &exc)
139 {
140 // catch anything thrown within try block that derives from std::exception
141 std::cerr << exc.what();
142 }
143 catch (...)
144 {
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800145 cout << "UNKNOWN EXCEPTION !!!" << endl;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800146 }
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800147 }
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800148}
149
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800150Bytes
Alexander Afanasyevdcfa9632013-01-07 16:38:19 -0800151CcnxWrapper::createContentObject(const Name &name, const void *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800152{
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800153 CcnxCharbufPtr ptr = name.toCcnxCharbuf();
154 ccn_charbuf *pname = ptr->getBuf();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800155 ccn_charbuf *content = ccn_charbuf_create();
156
Zhenkai Zhud9d03f62013-01-04 16:57:46 -0800157 struct ccn_signing_params sp = CCN_SIGNING_PARAMS_INIT;
158 sp.freshness = freshness;
159
160 if (ccn_sign_content(m_handle, content, pname, &sp, buf, len) != 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800161 {
Zhenkai Zhud9d03f62013-01-04 16:57:46 -0800162 BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("sign content failed"));
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800163 }
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800164
165 Bytes bytes;
166 readRaw(bytes, content->buf, content->length);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800167
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800168 ccn_charbuf_destroy (&content);
169
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800170 return bytes;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800171}
172
173int
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800174CcnxWrapper::putToCcnd (const Bytes &contentObject)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800175{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800176 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800177 if (!m_running || !m_connected)
178 return -1;
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800179
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800180
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800181 if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800182 {
183 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
184 }
185
186 return 0;
187}
188
189int
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800190CcnxWrapper::publishData (const Name &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800191{
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800192 Bytes co = createContentObject(name, buf, len, freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800193 return putToCcnd(co);
194}
195
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800196int
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800197CcnxWrapper::publishData (const Name &name, const Bytes &content, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800198{
Alexander Afanasyevdcfa9632013-01-07 16:38:19 -0800199 return publishData(name, head(content), content.size(), freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800200}
201
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800202
203static ccn_upcall_res
204incomingInterest(ccn_closure *selfp,
205 ccn_upcall_kind kind,
206 ccn_upcall_info *info)
207{
208 CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
209
210 switch (kind)
211 {
212 case CCN_UPCALL_FINAL: // effective in unit tests
213 delete f;
214 delete selfp;
215 return CCN_UPCALL_RESULT_OK;
216
217 case CCN_UPCALL_INTEREST:
218 break;
219
220 default:
221 return CCN_UPCALL_RESULT_OK;
222 }
223
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800224 Name interest(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800225
226 (*f) (interest);
227 return CCN_UPCALL_RESULT_OK;
228}
229
230static ccn_upcall_res
231incomingData(ccn_closure *selfp,
232 ccn_upcall_kind kind,
233 ccn_upcall_info *info)
234{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800235 Closure *cp = static_cast<Closure *> (selfp->data);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800236
237 switch (kind)
238 {
239 case CCN_UPCALL_FINAL: // effecitve in unit tests
Zhenkai Zhu19f81de2013-01-04 22:27:47 -0800240 delete cp;
241 cp = NULL;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800242 delete selfp;
243 return CCN_UPCALL_RESULT_OK;
244
245 case CCN_UPCALL_CONTENT:
246 break;
247
248 case CCN_UPCALL_INTEREST_TIMED_OUT: {
249 if (cp != NULL && cp->getRetry() > 0) {
250 cp->decRetry();
251 return CCN_UPCALL_RESULT_REEXPRESS;
252 }
253
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800254 Name interest(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800255 Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800256 switch(rv)
257 {
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800258 case Closure::RESULT_OK : return CCN_UPCALL_RESULT_OK;
259 case Closure::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800260 default : break;
261 }
262 return CCN_UPCALL_RESULT_OK;
263 }
264
265 default:
266 return CCN_UPCALL_RESULT_OK;
267 }
268
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800269 const unsigned char *pcontent;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800270 size_t len;
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800271 if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800272 {
273 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
274 }
275
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800276 Name name(info->content_ccnb, info->content_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800277
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800278 Bytes content;
279 // copy content and do processing on the copy
280 // otherwise the pointed memory may have been changed during the processing
281 readRaw(content, pcontent, len);
282
283 cp->runDataCallback(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800284
285 return CCN_UPCALL_RESULT_OK;
286}
287
Zhenkai Zhu19f81de2013-01-04 22:27:47 -0800288int CcnxWrapper::sendInterest (const Name &interest, const Closure *closure, const Selectors &selectors)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800289{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800290 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800291 if (!m_running || !m_connected)
292 return -1;
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800293
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800294 CcnxCharbufPtr namePtr = interest.toCcnxCharbuf();
295 ccn_charbuf *pname = namePtr->getBuf();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800296 ccn_closure *dataClosure = new ccn_closure;
297
Zhenkai Zhu19f81de2013-01-04 22:27:47 -0800298 Closure *myClosure = closure->dup();
299 dataClosure->data = (void *)myClosure;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800300
301 dataClosure->p = &incomingData;
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800302
303 CcnxCharbufPtr selectorsPtr = selectors.toCcnxCharbuf();
304 ccn_charbuf *templ = NULL;
305 if (selectorsPtr != CcnxCharbuf::Null)
306 {
307 templ = selectorsPtr->getBuf();
308 }
309
310 if (ccn_express_interest (m_handle, pname, dataClosure, templ) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800311 {
312 }
313
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800314 return 0;
315}
316
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800317int CcnxWrapper::setInterestFilter (const Name &prefix, const InterestCallback &interestCallback)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800318{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800319 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800320 if (!m_running || !m_connected)
321 return -1;
322
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800323 CcnxCharbufPtr ptr = prefix.toCcnxCharbuf();
324 ccn_charbuf *pname = ptr->getBuf();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800325 ccn_closure *interestClosure = new ccn_closure;
326
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800327 interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
328 interestClosure->p = &incomingInterest;
329 int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
330 if (ret < 0)
331 {
332 }
333
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800334 m_registeredInterests.insert(pair<Name, InterestCallback>(prefix, interestCallback));
Alexander Afanasyevdcfa9632013-01-07 16:38:19 -0800335
336 return ret;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800337}
338
339void
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800340CcnxWrapper::clearInterestFilter (const Name &prefix)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800341{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800342 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800343 if (!m_running || !m_connected)
344 return;
345
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800346 CcnxCharbufPtr ptr = prefix.toCcnxCharbuf();
347 ccn_charbuf *pname = ptr->getBuf();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800348
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800349 int ret = ccn_set_interest_filter (m_handle, pname, 0);
350 if (ret < 0)
351 {
352 }
353
354 m_registeredInterests.erase(prefix);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800355}
356
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800357Name
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800358CcnxWrapper::getLocalPrefix ()
359{
360 struct ccn * tmp_handle = ccn_create ();
361 int res = ccn_connect (tmp_handle, NULL);
362 if (res < 0)
363 {
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800364 return Name();
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800365 }
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800366
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800367 string retval = "";
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800368
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800369 struct ccn_charbuf *templ = ccn_charbuf_create();
370 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
371 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
372 ccn_charbuf_append_closer(templ); /* </Name> */
373 // XXX - use pubid if possible
374 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
375 ccnb_append_number(templ, 1);
376 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
377 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
378 ccn_charbuf_append_closer(templ); /* </Interest> */
379
380 struct ccn_charbuf *name = ccn_charbuf_create ();
381 res = ccn_name_from_uri (name, "/local/ndn/prefix");
382 if (res < 0) {
383 }
384 else
385 {
386 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800387
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800388 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800389 NULL, 4, CCN_V_HIGHEST, 0);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800390 if (stream == NULL) {
391 }
392 else
393 {
394 ostringstream os;
395
396 int counter = 0;
397 char buf[256];
398 while (true) {
399 res = ccn_fetch_read (stream, buf, sizeof(buf));
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800400
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800401 if (res == 0) {
402 break;
403 }
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800404
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800405 if (res > 0) {
406 os << string(buf, res);
407 } else if (res == CCN_FETCH_READ_NONE) {
408 if (counter < 2)
409 {
410 ccn_run(tmp_handle, 1000);
411 counter ++;
412 }
413 else
414 {
415 break;
416 }
417 } else if (res == CCN_FETCH_READ_END) {
418 break;
419 } else if (res == CCN_FETCH_READ_TIMEOUT) {
420 break;
421 } else {
422 break;
423 }
424 }
425 retval = os.str ();
426 stream = ccn_fetch_close(stream);
427 }
428 fetch = ccn_fetch_destroy(fetch);
429 }
430
431 ccn_charbuf_destroy (&name);
432
433 ccn_disconnect (tmp_handle);
434 ccn_destroy (&tmp_handle);
Zhenkai Zhu0d8f5d52012-12-30 12:54:07 -0800435
Zhenkai Zhucb2d0dd2013-01-03 14:10:48 -0800436 return Name(retval);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800437}
438
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800439}