blob: 0710c67ef045b13347e9fdbb85316329e4305d0e [file] [log] [blame]
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08001#include "ccnx-wrapper.h"
Zhenkai Zhu43eb2732012-12-28 00:48:26 -08002extern "C" {
3#include <ccn/fetch.h>
4}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08005#include <poll.h>
6#include <boost/throw_exception.hpp>
7#include <boost/date_time/posix_time/posix_time.hpp>
8#include <boost/random.hpp>
9#include <sstream>
10
11typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
12typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
13
14using namespace std;
15using namespace boost;
16
17namespace Ccnx {
18
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080019void
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080020readRaw(Bytes &bytes, const unsigned char *src, size_t len)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080021{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080022 if (len > 0)
23 {
24 bytes.resize(len);
25 memcpy(&bytes[0], src, len);
26 }
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080027}
28
Zhenkai Zhu974c5a62012-12-28 14:15:30 -080029const unsigned char *
30head(const Bytes &bytes)
31{
32 return &bytes[0];
33}
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080034
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080035CcnxWrapper::CcnxWrapper()
36 : m_handle (0)
37 , m_keyStore (0)
38 , m_keyLoactor (0)
39 , m_running (true)
40 , m_connected (false)
41{
42 connectCcnd();
43 initKeyStore ();
44 createKeyLocator ();
45 m_thread = thread (&CcnxWrapper::ccnLoop, this);
46}
47
48void
49CcnxWrapper::connectCcnd()
50{
51 recursive_mutex::scoped_lock lock (m_mutex);
52
53 if (m_handle != 0) {
54 ccn_disconnect (m_handle);
55 ccn_destroy (&m_handle);
56 }
57
58 m_handle = ccn_create ();
59 if (ccn_connect(m_handle, NULL) < 0)
60 {
61 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
62 }
63 m_connected = true;
64
65 if (!m_registeredInterests.empty())
66 {
67 for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
68 {
69 // clearInterestFilter(it->first);
70 setInterestFilter(it->first, it->second);
71 }
72 }
73}
74
75CcnxWrapper::~CcnxWrapper()
76{
77 {
78 recursive_mutex::scoped_lock lock(m_mutex);
79 m_running = false;
80 }
81
82 m_thread.join ();
83 ccn_disconnect (m_handle);
84 ccn_destroy (&m_handle);
85 ccn_charbuf_destroy (&m_keyLoactor);
86 ccn_keystore_destroy (&m_keyStore);
87}
88
89void
90CcnxWrapper::createKeyLocator ()
91{
92 m_keyLoactor = ccn_charbuf_create();
93 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
94 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
95 int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
96 if (res >= 0)
97 {
98 ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
99 ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
100 }
101}
102
103const ccn_pkey*
104CcnxWrapper::getPrivateKey ()
105{
106 return ccn_keystore_private_key (m_keyStore);
107}
108
109const unsigned char*
110CcnxWrapper::getPublicKeyDigest ()
111{
112 return ccn_keystore_public_key_digest(m_keyStore);
113}
114
115ssize_t
116CcnxWrapper::getPublicKeyDigestLength ()
117{
118 return ccn_keystore_public_key_digest_length(m_keyStore);
119}
120
121void
122CcnxWrapper::initKeyStore ()
123{
124 m_keyStore = ccn_keystore_create ();
125 string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
126 if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
127 BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
128}
129
130void
131CcnxWrapper::ccnLoop ()
132{
133 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
134 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
135
136 while (m_running)
137 {
138 try
139 {
140 int res = 0;
141 {
142 recursive_mutex::scoped_lock lock (m_mutex);
143 res = ccn_run (m_handle, 0);
144 }
145
146 if (!m_running) break;
147
148 if (res < 0)
149 BOOST_THROW_EXCEPTION (CcnxOperationException()
150 << errmsg_info_str("ccn_run returned error"));
151
152
153 pollfd pfds[1];
154 {
155 recursive_mutex::scoped_lock lock (m_mutex);
156
157 pfds[0].fd = ccn_get_connection_fd (m_handle);
158 pfds[0].events = POLLIN;
159 if (ccn_output_is_pending (m_handle))
160 pfds[0].events |= POLLOUT;
161 }
162
163 int ret = poll (pfds, 1, 1);
164 if (ret < 0)
165 {
166 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
167 }
168 }
169 catch (CcnxOperationException &e)
170 {
171 // do not try reconnect for now
172 throw e;
173 /*
174 m_connected = false;
175 // probably ccnd has been stopped
176 // try reconnect with sleep
177 int interval = 1;
178 int maxInterval = 32;
179 while (m_running)
180 {
181 try
182 {
183 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
184
185 connectCcnd();
186 _LOG_DEBUG("reconnect to ccnd succeeded");
187 break;
188 }
189 catch (CcnxOperationException &e)
190 {
191 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
192
193 // do exponential backup for reconnect interval
194 if (interval < maxInterval)
195 {
196 interval *= 2;
197 }
198 }
199 }
200 */
201 }
202 catch (const std::exception &exc)
203 {
204 // catch anything thrown within try block that derives from std::exception
205 std::cerr << exc.what();
206 }
207 catch (...)
208 {
209 cout << "UNKNOWN EXCEPTION !!!" << endl;
210 }
211 }
212}
213
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800214Bytes
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800215CcnxWrapper::createContentObject(const std::string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800216{
217 ccn_charbuf *pname = ccn_charbuf_create();
218 ccn_charbuf *signed_info = ccn_charbuf_create();
219 ccn_charbuf *content = ccn_charbuf_create();
220
221 ccn_name_from_uri(pname, name.c_str());
222 ccn_signed_info_create(signed_info,
223 getPublicKeyDigest(),
224 getPublicKeyDigestLength(),
225 NULL,
226 CCN_CONTENT_DATA,
227 freshness,
228 NULL,
229 m_keyLoactor);
230 if(ccn_encode_ContentObject(content, pname, signed_info,
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800231 buf, len,
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800232 NULL, getPrivateKey()) < 0)
233 {
234 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
235 }
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800236
237 Bytes bytes;
238 readRaw(bytes, content->buf, content->length);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800239
240 ccn_charbuf_destroy (&pname);
241 ccn_charbuf_destroy (&signed_info);
242 ccn_charbuf_destroy (&content);
243
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800244 return bytes;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800245}
246
247int
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800248CcnxWrapper::putToCcnd (const Bytes &contentObject)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800249{
250 recursive_mutex::scoped_lock lock(m_mutex);
251 if (!m_running || !m_connected)
252 return -1;
253
254
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800255 if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800256 {
257 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
258 }
259
260 return 0;
261}
262
263int
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800264CcnxWrapper::publishData (const string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800265{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800266 Bytes co = createContentObject(name, buf, len, freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800267 return putToCcnd(co);
268}
269
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800270int
271CcnxWrapper::publishData (const string &name, const Bytes &content, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800272{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800273 publishData(name, head(content), content.size(), freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800274}
275
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800276string
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800277CcnxWrapper::extractName(const unsigned char *data, const ccn_indexbuf *comps)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800278{
279 ostringstream name (ostringstream::out);
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800280 for (int i = 0; i < comps->n - 1; i++)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800281 {
282 char *comp;
283 size_t size;
284 name << "/";
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800285 ccn_name_comp_get(data, comps, i, (const unsigned char **)&comp, &size);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800286 string compStr(comp, size);
287 name << compStr;
288 }
289
290 return name.str();
291}
292
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800293
294static ccn_upcall_res
295incomingInterest(ccn_closure *selfp,
296 ccn_upcall_kind kind,
297 ccn_upcall_info *info)
298{
299 CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
300
301 switch (kind)
302 {
303 case CCN_UPCALL_FINAL: // effective in unit tests
304 delete f;
305 delete selfp;
306 return CCN_UPCALL_RESULT_OK;
307
308 case CCN_UPCALL_INTEREST:
309 break;
310
311 default:
312 return CCN_UPCALL_RESULT_OK;
313 }
314
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800315 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800316
317 (*f) (interest);
318 return CCN_UPCALL_RESULT_OK;
319}
320
321static ccn_upcall_res
322incomingData(ccn_closure *selfp,
323 ccn_upcall_kind kind,
324 ccn_upcall_info *info)
325{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800326 Closure *cp = static_cast<Closure *> (selfp->data);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800327
328 switch (kind)
329 {
330 case CCN_UPCALL_FINAL: // effecitve in unit tests
331 delete cp;
332 cp = NULL;
333 delete selfp;
334 return CCN_UPCALL_RESULT_OK;
335
336 case CCN_UPCALL_CONTENT:
337 break;
338
339 case CCN_UPCALL_INTEREST_TIMED_OUT: {
340 if (cp != NULL && cp->getRetry() > 0) {
341 cp->decRetry();
342 return CCN_UPCALL_RESULT_REEXPRESS;
343 }
344
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800345 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800346 Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800347 switch(rv)
348 {
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800349 case Closure::RESULT_OK : return CCN_UPCALL_RESULT_OK;
350 case Closure::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800351 default : break;
352 }
353 return CCN_UPCALL_RESULT_OK;
354 }
355
356 default:
357 return CCN_UPCALL_RESULT_OK;
358 }
359
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800360 const unsigned char *pcontent;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800361 size_t len;
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800362 if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800363 {
364 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
365 }
366
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800367 string name = CcnxWrapper::extractName(info->content_ccnb, info->content_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800368
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800369 Bytes content;
370 // copy content and do processing on the copy
371 // otherwise the pointed memory may have been changed during the processing
372 readRaw(content, pcontent, len);
373
374 cp->runDataCallback(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800375
376 return CCN_UPCALL_RESULT_OK;
377}
378
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800379int CcnxWrapper::sendInterest (const string &strInterest, Closure *closure)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800380{
381 recursive_mutex::scoped_lock lock(m_mutex);
382 if (!m_running || !m_connected)
383 return -1;
384
385 ccn_charbuf *pname = ccn_charbuf_create();
386 ccn_closure *dataClosure = new ccn_closure;
387
388 ccn_name_from_uri (pname, strInterest.c_str());
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800389 dataClosure->data = (void *)closure;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800390
391 dataClosure->p = &incomingData;
392 if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
393 {
394 }
395
396 ccn_charbuf_destroy (&pname);
397 return 0;
398}
399
400int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
401{
402 recursive_mutex::scoped_lock lock(m_mutex);
403 if (!m_running || !m_connected)
404 return -1;
405
406 ccn_charbuf *pname = ccn_charbuf_create();
407 ccn_closure *interestClosure = new ccn_closure;
408
409 ccn_name_from_uri (pname, prefix.c_str());
410 interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
411 interestClosure->p = &incomingInterest;
412 int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
413 if (ret < 0)
414 {
415 }
416
417 m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
418 ccn_charbuf_destroy(&pname);
419}
420
421void
422CcnxWrapper::clearInterestFilter (const std::string &prefix)
423{
424 recursive_mutex::scoped_lock lock(m_mutex);
425 if (!m_running || !m_connected)
426 return;
427
428 ccn_charbuf *pname = ccn_charbuf_create();
429
430 ccn_name_from_uri (pname, prefix.c_str());
431 int ret = ccn_set_interest_filter (m_handle, pname, 0);
432 if (ret < 0)
433 {
434 }
435
436 m_registeredInterests.erase(prefix);
437 ccn_charbuf_destroy(&pname);
438}
439
440string
441CcnxWrapper::getLocalPrefix ()
442{
443 struct ccn * tmp_handle = ccn_create ();
444 int res = ccn_connect (tmp_handle, NULL);
445 if (res < 0)
446 {
447 return "";
448 }
449
450 string retval = "";
451
452 struct ccn_charbuf *templ = ccn_charbuf_create();
453 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
454 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
455 ccn_charbuf_append_closer(templ); /* </Name> */
456 // XXX - use pubid if possible
457 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
458 ccnb_append_number(templ, 1);
459 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
460 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
461 ccn_charbuf_append_closer(templ); /* </Interest> */
462
463 struct ccn_charbuf *name = ccn_charbuf_create ();
464 res = ccn_name_from_uri (name, "/local/ndn/prefix");
465 if (res < 0) {
466 }
467 else
468 {
469 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
470
471 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
472 NULL, 4, CCN_V_HIGHEST, 0);
473 if (stream == NULL) {
474 }
475 else
476 {
477 ostringstream os;
478
479 int counter = 0;
480 char buf[256];
481 while (true) {
482 res = ccn_fetch_read (stream, buf, sizeof(buf));
483
484 if (res == 0) {
485 break;
486 }
487
488 if (res > 0) {
489 os << string(buf, res);
490 } else if (res == CCN_FETCH_READ_NONE) {
491 if (counter < 2)
492 {
493 ccn_run(tmp_handle, 1000);
494 counter ++;
495 }
496 else
497 {
498 break;
499 }
500 } else if (res == CCN_FETCH_READ_END) {
501 break;
502 } else if (res == CCN_FETCH_READ_TIMEOUT) {
503 break;
504 } else {
505 break;
506 }
507 }
508 retval = os.str ();
509 stream = ccn_fetch_close(stream);
510 }
511 fetch = ccn_fetch_destroy(fetch);
512 }
513
514 ccn_charbuf_destroy (&name);
515
516 ccn_disconnect (tmp_handle);
517 ccn_destroy (&tmp_handle);
518
519 return retval;
520}
521
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800522}