blob: 1b791c96e4960539ad022dbd25fb2fd2f2d62e6b [file] [log] [blame]
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08001#include "ccnx-wrapper.h"
Zhenkai Zhu43eb2732012-12-28 00:48:26 -08002extern "C" {
3#include <ccn/fetch.h>
4}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08005#include <poll.h>
6#include <boost/throw_exception.hpp>
7#include <boost/date_time/posix_time/posix_time.hpp>
8#include <boost/random.hpp>
9#include <sstream>
10
11typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
12typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
13
14using namespace std;
15using namespace boost;
16
17namespace Ccnx {
18
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080019void
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080020readRaw(Bytes &bytes, const unsigned char *src, size_t len)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080021{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080022 if (len > 0)
23 {
24 bytes.resize(len);
25 memcpy(&bytes[0], src, len);
26 }
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080027}
28
Zhenkai Zhu974c5a62012-12-28 14:15:30 -080029const unsigned char *
30head(const Bytes &bytes)
31{
32 return &bytes[0];
33}
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080034
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -080035void
36split(const string &name, Comps &comps)
37{
38 stringstream ss(name);
39 string comp;
40 while(getline(ss, comp, '/'))
41 {
42 comps.push_back(comp);
43 }
44}
45
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080046CcnxWrapper::CcnxWrapper()
47 : m_handle (0)
48 , m_keyStore (0)
49 , m_keyLoactor (0)
50 , m_running (true)
51 , m_connected (false)
52{
53 connectCcnd();
54 initKeyStore ();
55 createKeyLocator ();
56 m_thread = thread (&CcnxWrapper::ccnLoop, this);
57}
58
59void
60CcnxWrapper::connectCcnd()
61{
62 recursive_mutex::scoped_lock lock (m_mutex);
63
64 if (m_handle != 0) {
65 ccn_disconnect (m_handle);
66 ccn_destroy (&m_handle);
67 }
68
69 m_handle = ccn_create ();
70 if (ccn_connect(m_handle, NULL) < 0)
71 {
72 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
73 }
74 m_connected = true;
75
76 if (!m_registeredInterests.empty())
77 {
78 for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
79 {
80 // clearInterestFilter(it->first);
81 setInterestFilter(it->first, it->second);
82 }
83 }
84}
85
86CcnxWrapper::~CcnxWrapper()
87{
88 {
89 recursive_mutex::scoped_lock lock(m_mutex);
90 m_running = false;
91 }
92
93 m_thread.join ();
94 ccn_disconnect (m_handle);
95 ccn_destroy (&m_handle);
96 ccn_charbuf_destroy (&m_keyLoactor);
97 ccn_keystore_destroy (&m_keyStore);
98}
99
100void
101CcnxWrapper::createKeyLocator ()
102{
103 m_keyLoactor = ccn_charbuf_create();
104 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
105 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
106 int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
107 if (res >= 0)
108 {
109 ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
110 ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
111 }
112}
113
114const ccn_pkey*
115CcnxWrapper::getPrivateKey ()
116{
117 return ccn_keystore_private_key (m_keyStore);
118}
119
120const unsigned char*
121CcnxWrapper::getPublicKeyDigest ()
122{
123 return ccn_keystore_public_key_digest(m_keyStore);
124}
125
126ssize_t
127CcnxWrapper::getPublicKeyDigestLength ()
128{
129 return ccn_keystore_public_key_digest_length(m_keyStore);
130}
131
132void
133CcnxWrapper::initKeyStore ()
134{
135 m_keyStore = ccn_keystore_create ();
136 string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
137 if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
138 BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
139}
140
141void
142CcnxWrapper::ccnLoop ()
143{
144 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
145 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
146
147 while (m_running)
148 {
149 try
150 {
151 int res = 0;
152 {
153 recursive_mutex::scoped_lock lock (m_mutex);
154 res = ccn_run (m_handle, 0);
155 }
156
157 if (!m_running) break;
158
159 if (res < 0)
160 BOOST_THROW_EXCEPTION (CcnxOperationException()
161 << errmsg_info_str("ccn_run returned error"));
162
163
164 pollfd pfds[1];
165 {
166 recursive_mutex::scoped_lock lock (m_mutex);
167
168 pfds[0].fd = ccn_get_connection_fd (m_handle);
169 pfds[0].events = POLLIN;
170 if (ccn_output_is_pending (m_handle))
171 pfds[0].events |= POLLOUT;
172 }
173
174 int ret = poll (pfds, 1, 1);
175 if (ret < 0)
176 {
177 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
178 }
179 }
180 catch (CcnxOperationException &e)
181 {
182 // do not try reconnect for now
183 throw e;
184 /*
185 m_connected = false;
186 // probably ccnd has been stopped
187 // try reconnect with sleep
188 int interval = 1;
189 int maxInterval = 32;
190 while (m_running)
191 {
192 try
193 {
194 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
195
196 connectCcnd();
197 _LOG_DEBUG("reconnect to ccnd succeeded");
198 break;
199 }
200 catch (CcnxOperationException &e)
201 {
202 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
203
204 // do exponential backup for reconnect interval
205 if (interval < maxInterval)
206 {
207 interval *= 2;
208 }
209 }
210 }
211 */
212 }
213 catch (const std::exception &exc)
214 {
215 // catch anything thrown within try block that derives from std::exception
216 std::cerr << exc.what();
217 }
218 catch (...)
219 {
220 cout << "UNKNOWN EXCEPTION !!!" << endl;
221 }
222 }
223}
224
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800225Bytes
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800226CcnxWrapper::createContentObject(const std::string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800227{
228 ccn_charbuf *pname = ccn_charbuf_create();
229 ccn_charbuf *signed_info = ccn_charbuf_create();
230 ccn_charbuf *content = ccn_charbuf_create();
231
232 ccn_name_from_uri(pname, name.c_str());
233 ccn_signed_info_create(signed_info,
234 getPublicKeyDigest(),
235 getPublicKeyDigestLength(),
236 NULL,
237 CCN_CONTENT_DATA,
238 freshness,
239 NULL,
240 m_keyLoactor);
241 if(ccn_encode_ContentObject(content, pname, signed_info,
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800242 buf, len,
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800243 NULL, getPrivateKey()) < 0)
244 {
245 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
246 }
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800247
248 Bytes bytes;
249 readRaw(bytes, content->buf, content->length);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800250
251 ccn_charbuf_destroy (&pname);
252 ccn_charbuf_destroy (&signed_info);
253 ccn_charbuf_destroy (&content);
254
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800255 return bytes;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800256}
257
258int
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800259CcnxWrapper::putToCcnd (const Bytes &contentObject)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800260{
261 recursive_mutex::scoped_lock lock(m_mutex);
262 if (!m_running || !m_connected)
263 return -1;
264
265
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800266 if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800267 {
268 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
269 }
270
271 return 0;
272}
273
274int
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800275CcnxWrapper::publishData (const string &name, const unsigned char *buf, size_t len, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800276{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800277 Bytes co = createContentObject(name, buf, len, freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800278 return putToCcnd(co);
279}
280
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800281int
282CcnxWrapper::publishData (const string &name, const Bytes &content, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800283{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800284 publishData(name, head(content), content.size(), freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800285}
286
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800287string
Zhenkai Zhubad089c2012-12-28 10:28:27 -0800288CcnxWrapper::extractName(const unsigned char *data, const ccn_indexbuf *comps)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800289{
290 ostringstream name (ostringstream::out);
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800291 for (int i = 0; i < comps->n - 1; i++)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800292 {
293 char *comp;
294 size_t size;
295 name << "/";
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800296 ccn_name_comp_get(data, comps, i, (const unsigned char **)&comp, &size);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800297 string compStr(comp, size);
298 name << compStr;
299 }
300
301 return name.str();
302}
303
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800304
305static ccn_upcall_res
306incomingInterest(ccn_closure *selfp,
307 ccn_upcall_kind kind,
308 ccn_upcall_info *info)
309{
310 CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
311
312 switch (kind)
313 {
314 case CCN_UPCALL_FINAL: // effective in unit tests
315 delete f;
316 delete selfp;
317 return CCN_UPCALL_RESULT_OK;
318
319 case CCN_UPCALL_INTEREST:
320 break;
321
322 default:
323 return CCN_UPCALL_RESULT_OK;
324 }
325
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800326 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800327
328 (*f) (interest);
329 return CCN_UPCALL_RESULT_OK;
330}
331
332static ccn_upcall_res
333incomingData(ccn_closure *selfp,
334 ccn_upcall_kind kind,
335 ccn_upcall_info *info)
336{
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800337 Closure *cp = static_cast<Closure *> (selfp->data);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800338
339 switch (kind)
340 {
341 case CCN_UPCALL_FINAL: // effecitve in unit tests
342 delete cp;
343 cp = NULL;
344 delete selfp;
345 return CCN_UPCALL_RESULT_OK;
346
347 case CCN_UPCALL_CONTENT:
348 break;
349
350 case CCN_UPCALL_INTEREST_TIMED_OUT: {
351 if (cp != NULL && cp->getRetry() > 0) {
352 cp->decRetry();
353 return CCN_UPCALL_RESULT_REEXPRESS;
354 }
355
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800356 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800357 Closure::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800358 switch(rv)
359 {
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800360 case Closure::RESULT_OK : return CCN_UPCALL_RESULT_OK;
361 case Closure::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800362 default : break;
363 }
364 return CCN_UPCALL_RESULT_OK;
365 }
366
367 default:
368 return CCN_UPCALL_RESULT_OK;
369 }
370
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800371 const unsigned char *pcontent;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800372 size_t len;
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800373 if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800374 {
375 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
376 }
377
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800378 string name = CcnxWrapper::extractName(info->content_ccnb, info->content_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800379
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800380 Bytes content;
381 // copy content and do processing on the copy
382 // otherwise the pointed memory may have been changed during the processing
383 readRaw(content, pcontent, len);
384
385 cp->runDataCallback(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800386
387 return CCN_UPCALL_RESULT_OK;
388}
389
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -0800390int CcnxWrapper::sendInterest (const Interest &interest, Closure *closure)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800391{
392 recursive_mutex::scoped_lock lock(m_mutex);
393 if (!m_running || !m_connected)
394 return -1;
395
396 ccn_charbuf *pname = ccn_charbuf_create();
397 ccn_closure *dataClosure = new ccn_closure;
398
Zhenkai Zhu9bad2bf2012-12-28 15:31:46 -0800399 ccn_name_from_uri (pname, interest.name().c_str());
Zhenkai Zhu974c5a62012-12-28 14:15:30 -0800400 dataClosure->data = (void *)closure;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800401
402 dataClosure->p = &incomingData;
403 if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
404 {
405 }
406
407 ccn_charbuf_destroy (&pname);
408 return 0;
409}
410
411int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
412{
413 recursive_mutex::scoped_lock lock(m_mutex);
414 if (!m_running || !m_connected)
415 return -1;
416
417 ccn_charbuf *pname = ccn_charbuf_create();
418 ccn_closure *interestClosure = new ccn_closure;
419
420 ccn_name_from_uri (pname, prefix.c_str());
421 interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
422 interestClosure->p = &incomingInterest;
423 int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
424 if (ret < 0)
425 {
426 }
427
428 m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
429 ccn_charbuf_destroy(&pname);
430}
431
432void
433CcnxWrapper::clearInterestFilter (const std::string &prefix)
434{
435 recursive_mutex::scoped_lock lock(m_mutex);
436 if (!m_running || !m_connected)
437 return;
438
439 ccn_charbuf *pname = ccn_charbuf_create();
440
441 ccn_name_from_uri (pname, prefix.c_str());
442 int ret = ccn_set_interest_filter (m_handle, pname, 0);
443 if (ret < 0)
444 {
445 }
446
447 m_registeredInterests.erase(prefix);
448 ccn_charbuf_destroy(&pname);
449}
450
451string
452CcnxWrapper::getLocalPrefix ()
453{
454 struct ccn * tmp_handle = ccn_create ();
455 int res = ccn_connect (tmp_handle, NULL);
456 if (res < 0)
457 {
458 return "";
459 }
460
461 string retval = "";
462
463 struct ccn_charbuf *templ = ccn_charbuf_create();
464 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
465 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
466 ccn_charbuf_append_closer(templ); /* </Name> */
467 // XXX - use pubid if possible
468 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
469 ccnb_append_number(templ, 1);
470 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
471 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
472 ccn_charbuf_append_closer(templ); /* </Interest> */
473
474 struct ccn_charbuf *name = ccn_charbuf_create ();
475 res = ccn_name_from_uri (name, "/local/ndn/prefix");
476 if (res < 0) {
477 }
478 else
479 {
480 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
481
482 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
483 NULL, 4, CCN_V_HIGHEST, 0);
484 if (stream == NULL) {
485 }
486 else
487 {
488 ostringstream os;
489
490 int counter = 0;
491 char buf[256];
492 while (true) {
493 res = ccn_fetch_read (stream, buf, sizeof(buf));
494
495 if (res == 0) {
496 break;
497 }
498
499 if (res > 0) {
500 os << string(buf, res);
501 } else if (res == CCN_FETCH_READ_NONE) {
502 if (counter < 2)
503 {
504 ccn_run(tmp_handle, 1000);
505 counter ++;
506 }
507 else
508 {
509 break;
510 }
511 } else if (res == CCN_FETCH_READ_END) {
512 break;
513 } else if (res == CCN_FETCH_READ_TIMEOUT) {
514 break;
515 } else {
516 break;
517 }
518 }
519 retval = os.str ();
520 stream = ccn_fetch_close(stream);
521 }
522 fetch = ccn_fetch_destroy(fetch);
523 }
524
525 ccn_charbuf_destroy (&name);
526
527 ccn_disconnect (tmp_handle);
528 ccn_destroy (&tmp_handle);
529
530 return retval;
531}
532
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800533}