blob: 041665ee2277bb635e98f8a58c832b5eeb4d7649 [file] [log] [blame]
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08001#include "ccnx-wrapper.h"
Zhenkai Zhu43eb2732012-12-28 00:48:26 -08002extern "C" {
3#include <ccn/fetch.h>
4}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08005#include <poll.h>
6#include <boost/throw_exception.hpp>
7#include <boost/date_time/posix_time/posix_time.hpp>
8#include <boost/random.hpp>
9#include <sstream>
10
11typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
12typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
13
14using namespace std;
15using namespace boost;
16
17namespace Ccnx {
18
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080019void
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080020readRaw(Bytes &bytes, const unsigned char *src, size_t len)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080021{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080022 if (len > 0)
23 {
24 bytes.resize(len);
25 memcpy(&bytes[0], src, len);
26 }
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080027}
28
Zhenkai Zhu974c5a62012-12-28 14:15:30 -080029const unsigned char *
30head(const Bytes &bytes)
31{
32 return &bytes[0];
33}
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080034
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -080035void
36split(const string &name, Comps &comps)
37{
38 stringstream ss(name);
39 string comp;
40 while(getline(ss, comp, '/'))
41 {
42 comps.push_back(comp);
43 }
44}
45
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080046CcnxWrapper::CcnxWrapper()
47 : m_handle (0)
48 , m_keyStore (0)
49 , m_keyLoactor (0)
50 , m_running (true)
51 , m_connected (false)
52{
53 connectCcnd();
54 initKeyStore ();
55 createKeyLocator ();
56 m_thread = thread (&CcnxWrapper::ccnLoop, this);
57}
58
59void
60CcnxWrapper::connectCcnd()
61{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -080062 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080063 if (m_handle != 0) {
64 ccn_disconnect (m_handle);
65 ccn_destroy (&m_handle);
66 }
67
68 m_handle = ccn_create ();
69 if (ccn_connect(m_handle, NULL) < 0)
70 {
71 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
72 }
73 m_connected = true;
74
75 if (!m_registeredInterests.empty())
76 {
77 for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
78 {
79 // clearInterestFilter(it->first);
80 setInterestFilter(it->first, it->second);
81 }
82 }
83}
84
85CcnxWrapper::~CcnxWrapper()
86{
87 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -080088 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080089 m_running = false;
90 }
91
92 m_thread.join ();
93 ccn_disconnect (m_handle);
94 ccn_destroy (&m_handle);
95 ccn_charbuf_destroy (&m_keyLoactor);
96 ccn_keystore_destroy (&m_keyStore);
97}
98
99void
100CcnxWrapper::createKeyLocator ()
101{
102 m_keyLoactor = ccn_charbuf_create();
103 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
104 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
105 int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
106 if (res >= 0)
107 {
108 ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
109 ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
110 }
111}
112
113const ccn_pkey*
114CcnxWrapper::getPrivateKey ()
115{
116 return ccn_keystore_private_key (m_keyStore);
117}
118
119const unsigned char*
120CcnxWrapper::getPublicKeyDigest ()
121{
122 return ccn_keystore_public_key_digest(m_keyStore);
123}
124
125ssize_t
126CcnxWrapper::getPublicKeyDigestLength ()
127{
128 return ccn_keystore_public_key_digest_length(m_keyStore);
129}
130
131void
132CcnxWrapper::initKeyStore ()
133{
134 m_keyStore = ccn_keystore_create ();
135 string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
136 if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
137 BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
138}
139
140void
141CcnxWrapper::ccnLoop ()
142{
143 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
144 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
145
146 while (m_running)
147 {
148 try
149 {
150 int res = 0;
151 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800152 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800153 res = ccn_run (m_handle, 0);
154 }
155
156 if (!m_running) break;
157
158 if (res < 0)
159 BOOST_THROW_EXCEPTION (CcnxOperationException()
160 << errmsg_info_str("ccn_run returned error"));
161
162
163 pollfd pfds[1];
164 {
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800165 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800166
167 pfds[0].fd = ccn_get_connection_fd (m_handle);
168 pfds[0].events = POLLIN;
169 if (ccn_output_is_pending (m_handle))
170 pfds[0].events |= POLLOUT;
171 }
172
173 int ret = poll (pfds, 1, 1);
174 if (ret < 0)
175 {
176 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
177 }
178 }
179 catch (CcnxOperationException &e)
180 {
181 // do not try reconnect for now
182 throw e;
183 /*
184 m_connected = false;
185 // probably ccnd has been stopped
186 // try reconnect with sleep
187 int interval = 1;
188 int maxInterval = 32;
189 while (m_running)
190 {
191 try
192 {
193 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
194
195 connectCcnd();
196 _LOG_DEBUG("reconnect to ccnd succeeded");
197 break;
198 }
199 catch (CcnxOperationException &e)
200 {
201 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
202
203 // do exponential backup for reconnect interval
204 if (interval < maxInterval)
205 {
206 interval *= 2;
207 }
208 }
209 }
210 */
211 }
212 catch (const std::exception &exc)
213 {
214 // catch anything thrown within try block that derives from std::exception
215 std::cerr << exc.what();
216 }
217 catch (...)
218 {
219 cout << "UNKNOWN EXCEPTION !!!" << endl;
220 }
221 }
222}
223
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800224Bytes
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800225CcnxWrapper::createContentObject(const std::string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800226{
227 ccn_charbuf *pname = ccn_charbuf_create();
228 ccn_charbuf *signed_info = ccn_charbuf_create();
229 ccn_charbuf *content = ccn_charbuf_create();
230
231 ccn_name_from_uri(pname, name.c_str());
232 ccn_signed_info_create(signed_info,
233 getPublicKeyDigest(),
234 getPublicKeyDigestLength(),
235 NULL,
236 CCN_CONTENT_DATA,
237 freshness,
238 NULL,
239 m_keyLoactor);
240 if(ccn_encode_ContentObject(content, pname, signed_info,
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800241 buf, len,
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800242 NULL, getPrivateKey()) < 0)
243 {
244 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
245 }
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800246
247 Bytes bytes;
248 readRaw(bytes, content->buf, content->length);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800249
250 ccn_charbuf_destroy (&pname);
251 ccn_charbuf_destroy (&signed_info);
252 ccn_charbuf_destroy (&content);
253
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800254 return bytes;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800255}
256
257int
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800258CcnxWrapper::putToCcnd (const Bytes &contentObject)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800259{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800260 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800261 if (!m_running || !m_connected)
262 return -1;
263
264
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800265 if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800266 {
267 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
268 }
269
270 return 0;
271}
272
273int
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800274CcnxWrapper::publishData (const string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800275{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800276 Bytes co = createContentObject(name, buf, len, freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800277 return putToCcnd(co);
278}
279
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800280int
281CcnxWrapper::publishData (const string &name, const Bytes &content, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800282{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800283 publishData(name, head(content), content.size(), freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800284}
285
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800286string
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800287CcnxWrapper::extractName(const unsigned char *data, const ccn_indexbuf *comps)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800288{
289 ostringstream name (ostringstream::out);
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800290 for (int i = 0; i < comps->n - 1; i++)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800291 {
292 char *comp;
293 size_t size;
294 name << "/";
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800295 ccn_name_comp_get(data, comps, i, (const unsigned char **)&comp, &size);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800296 string compStr(comp, size);
297 name << compStr;
298 }
299
300 return name.str();
301}
302
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800303
304static ccn_upcall_res
305incomingInterest(ccn_closure *selfp,
306 ccn_upcall_kind kind,
307 ccn_upcall_info *info)
308{
309 CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
310
311 switch (kind)
312 {
313 case CCN_UPCALL_FINAL: // effective in unit tests
314 delete f;
315 delete selfp;
316 return CCN_UPCALL_RESULT_OK;
317
318 case CCN_UPCALL_INTEREST:
319 break;
320
321 default:
322 return CCN_UPCALL_RESULT_OK;
323 }
324
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800325 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800326
327 (*f) (interest);
328 return CCN_UPCALL_RESULT_OK;
329}
330
331static ccn_upcall_res
332incomingData(ccn_closure *selfp,
333 ccn_upcall_kind kind,
334 ccn_upcall_info *info)
335{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800336 Closure *cp = static_cast<Closure *> (selfp->data);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800337
338 switch (kind)
339 {
340 case CCN_UPCALL_FINAL: // effecitve in unit tests
341 delete cp;
342 cp = NULL;
343 delete selfp;
344 return CCN_UPCALL_RESULT_OK;
345
346 case CCN_UPCALL_CONTENT:
347 break;
348
349 case CCN_UPCALL_INTEREST_TIMED_OUT: {
350 if (cp != NULL && cp->getRetry() > 0) {
351 cp->decRetry();
352 return CCN_UPCALL_RESULT_REEXPRESS;
353 }
354
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800355 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800356 Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800357 switch(rv)
358 {
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800359 case Closure::RESULT_OK : return CCN_UPCALL_RESULT_OK;
360 case Closure::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800361 default : break;
362 }
363 return CCN_UPCALL_RESULT_OK;
364 }
365
366 default:
367 return CCN_UPCALL_RESULT_OK;
368 }
369
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800370 const unsigned char *pcontent;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800371 size_t len;
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800372 if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800373 {
374 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
375 }
376
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800377 string name = CcnxWrapper::extractName(info->content_ccnb, info->content_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800378
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800379 Bytes content;
380 // copy content and do processing on the copy
381 // otherwise the pointed memory may have been changed during the processing
382 readRaw(content, pcontent, len);
383
384 cp->runDataCallback(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800385
386 return CCN_UPCALL_RESULT_OK;
387}
388
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -0800389int CcnxWrapper::sendInterest (const Interest &interest, Closure *closure)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800390{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800391 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800392 if (!m_running || !m_connected)
393 return -1;
394
395 ccn_charbuf *pname = ccn_charbuf_create();
396 ccn_closure *dataClosure = new ccn_closure;
397
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -0800398 ccn_name_from_uri (pname, interest.name().c_str());
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800399 dataClosure->data = (void *)closure;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800400
401 dataClosure->p = &incomingData;
402 if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
403 {
404 }
405
406 ccn_charbuf_destroy (&pname);
407 return 0;
408}
409
410int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
411{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800412 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800413 if (!m_running || !m_connected)
414 return -1;
415
416 ccn_charbuf *pname = ccn_charbuf_create();
417 ccn_closure *interestClosure = new ccn_closure;
418
419 ccn_name_from_uri (pname, prefix.c_str());
420 interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
421 interestClosure->p = &incomingInterest;
422 int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
423 if (ret < 0)
424 {
425 }
426
427 m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
428 ccn_charbuf_destroy(&pname);
429}
430
431void
432CcnxWrapper::clearInterestFilter (const std::string &prefix)
433{
Zhenkai Zhu7f4258e2012-12-29 19:55:13 -0800434 UniqueRecLock(m_mutex);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800435 if (!m_running || !m_connected)
436 return;
437
438 ccn_charbuf *pname = ccn_charbuf_create();
439
440 ccn_name_from_uri (pname, prefix.c_str());
441 int ret = ccn_set_interest_filter (m_handle, pname, 0);
442 if (ret < 0)
443 {
444 }
445
446 m_registeredInterests.erase(prefix);
447 ccn_charbuf_destroy(&pname);
448}
449
450string
451CcnxWrapper::getLocalPrefix ()
452{
453 struct ccn * tmp_handle = ccn_create ();
454 int res = ccn_connect (tmp_handle, NULL);
455 if (res < 0)
456 {
457 return "";
458 }
459
460 string retval = "";
461
462 struct ccn_charbuf *templ = ccn_charbuf_create();
463 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
464 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
465 ccn_charbuf_append_closer(templ); /* </Name> */
466 // XXX - use pubid if possible
467 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
468 ccnb_append_number(templ, 1);
469 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
470 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
471 ccn_charbuf_append_closer(templ); /* </Interest> */
472
473 struct ccn_charbuf *name = ccn_charbuf_create ();
474 res = ccn_name_from_uri (name, "/local/ndn/prefix");
475 if (res < 0) {
476 }
477 else
478 {
479 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
480
481 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
482 NULL, 4, CCN_V_HIGHEST, 0);
483 if (stream == NULL) {
484 }
485 else
486 {
487 ostringstream os;
488
489 int counter = 0;
490 char buf[256];
491 while (true) {
492 res = ccn_fetch_read (stream, buf, sizeof(buf));
493
494 if (res == 0) {
495 break;
496 }
497
498 if (res > 0) {
499 os << string(buf, res);
500 } else if (res == CCN_FETCH_READ_NONE) {
501 if (counter < 2)
502 {
503 ccn_run(tmp_handle, 1000);
504 counter ++;
505 }
506 else
507 {
508 break;
509 }
510 } else if (res == CCN_FETCH_READ_END) {
511 break;
512 } else if (res == CCN_FETCH_READ_TIMEOUT) {
513 break;
514 } else {
515 break;
516 }
517 }
518 retval = os.str ();
519 stream = ccn_fetch_close(stream);
520 }
521 fetch = ccn_fetch_destroy(fetch);
522 }
523
524 ccn_charbuf_destroy (&name);
525
526 ccn_disconnect (tmp_handle);
527 ccn_destroy (&tmp_handle);
528
529 return retval;
530}
531
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800532}