blob: 58a2b7c2c73fb9fc00c9ac1863c92f977172c9f3 [file] [log] [blame]
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08001#include "ccnx-wrapper.h"
Zhenkai Zhu43eb2732012-12-28 00:48:26 -08002extern "C" {
3#include <ccn/fetch.h>
4}
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -08005#include <poll.h>
6#include <boost/throw_exception.hpp>
7#include <boost/date_time/posix_time/posix_time.hpp>
8#include <boost/random.hpp>
9#include <sstream>
10
11typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
12typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
13
14using namespace std;
15using namespace boost;
16
17namespace Ccnx {
18
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080019void
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080020readRaw(Bytes &bytes, const unsigned char *src, size_t len)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080021{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080022 if (len > 0)
23 {
24 bytes.resize(len);
25 memcpy(&bytes[0], src, len);
26 }
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080027}
28
Zhenkai Zhu43eb2732012-12-28 00:48:26 -080029
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -080030CcnxWrapper::CcnxWrapper()
31 : m_handle (0)
32 , m_keyStore (0)
33 , m_keyLoactor (0)
34 , m_running (true)
35 , m_connected (false)
36{
37 connectCcnd();
38 initKeyStore ();
39 createKeyLocator ();
40 m_thread = thread (&CcnxWrapper::ccnLoop, this);
41}
42
43void
44CcnxWrapper::connectCcnd()
45{
46 recursive_mutex::scoped_lock lock (m_mutex);
47
48 if (m_handle != 0) {
49 ccn_disconnect (m_handle);
50 ccn_destroy (&m_handle);
51 }
52
53 m_handle = ccn_create ();
54 if (ccn_connect(m_handle, NULL) < 0)
55 {
56 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("connection to ccnd failed"));
57 }
58 m_connected = true;
59
60 if (!m_registeredInterests.empty())
61 {
62 for (map<std::string, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
63 {
64 // clearInterestFilter(it->first);
65 setInterestFilter(it->first, it->second);
66 }
67 }
68}
69
70CcnxWrapper::~CcnxWrapper()
71{
72 {
73 recursive_mutex::scoped_lock lock(m_mutex);
74 m_running = false;
75 }
76
77 m_thread.join ();
78 ccn_disconnect (m_handle);
79 ccn_destroy (&m_handle);
80 ccn_charbuf_destroy (&m_keyLoactor);
81 ccn_keystore_destroy (&m_keyStore);
82}
83
84void
85CcnxWrapper::createKeyLocator ()
86{
87 m_keyLoactor = ccn_charbuf_create();
88 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_KeyLocator, CCN_DTAG);
89 ccn_charbuf_append_tt (m_keyLoactor, CCN_DTAG_Key, CCN_DTAG);
90 int res = ccn_append_pubkey_blob (m_keyLoactor, ccn_keystore_public_key(m_keyStore));
91 if (res >= 0)
92 {
93 ccn_charbuf_append_closer (m_keyLoactor); /* </Key> */
94 ccn_charbuf_append_closer (m_keyLoactor); /* </KeyLocator> */
95 }
96}
97
98const ccn_pkey*
99CcnxWrapper::getPrivateKey ()
100{
101 return ccn_keystore_private_key (m_keyStore);
102}
103
104const unsigned char*
105CcnxWrapper::getPublicKeyDigest ()
106{
107 return ccn_keystore_public_key_digest(m_keyStore);
108}
109
110ssize_t
111CcnxWrapper::getPublicKeyDigestLength ()
112{
113 return ccn_keystore_public_key_digest_length(m_keyStore);
114}
115
116void
117CcnxWrapper::initKeyStore ()
118{
119 m_keyStore = ccn_keystore_create ();
120 string keyStoreFile = string(getenv("HOME")) + string("/.ccnx/.ccnx_keystore");
121 if (ccn_keystore_init (m_keyStore, (char *)keyStoreFile.c_str(), (char*)"Th1s1sn0t8g00dp8ssw0rd.") < 0)
122 BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str(keyStoreFile.c_str()));
123}
124
125void
126CcnxWrapper::ccnLoop ()
127{
128 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
129 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
130
131 while (m_running)
132 {
133 try
134 {
135 int res = 0;
136 {
137 recursive_mutex::scoped_lock lock (m_mutex);
138 res = ccn_run (m_handle, 0);
139 }
140
141 if (!m_running) break;
142
143 if (res < 0)
144 BOOST_THROW_EXCEPTION (CcnxOperationException()
145 << errmsg_info_str("ccn_run returned error"));
146
147
148 pollfd pfds[1];
149 {
150 recursive_mutex::scoped_lock lock (m_mutex);
151
152 pfds[0].fd = ccn_get_connection_fd (m_handle);
153 pfds[0].events = POLLIN;
154 if (ccn_output_is_pending (m_handle))
155 pfds[0].events |= POLLOUT;
156 }
157
158 int ret = poll (pfds, 1, 1);
159 if (ret < 0)
160 {
161 BOOST_THROW_EXCEPTION (CcnxOperationException() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
162 }
163 }
164 catch (CcnxOperationException &e)
165 {
166 // do not try reconnect for now
167 throw e;
168 /*
169 m_connected = false;
170 // probably ccnd has been stopped
171 // try reconnect with sleep
172 int interval = 1;
173 int maxInterval = 32;
174 while (m_running)
175 {
176 try
177 {
178 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
179
180 connectCcnd();
181 _LOG_DEBUG("reconnect to ccnd succeeded");
182 break;
183 }
184 catch (CcnxOperationException &e)
185 {
186 this_thread::sleep (boost::get_system_time () + TIME_SECONDS(interval) + TIME_MILLISECONDS (rangeUniformRandom ()));
187
188 // do exponential backup for reconnect interval
189 if (interval < maxInterval)
190 {
191 interval *= 2;
192 }
193 }
194 }
195 */
196 }
197 catch (const std::exception &exc)
198 {
199 // catch anything thrown within try block that derives from std::exception
200 std::cerr << exc.what();
201 }
202 catch (...)
203 {
204 cout << "UNKNOWN EXCEPTION !!!" << endl;
205 }
206 }
207}
208
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800209Bytes
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800210CcnxWrapper::createContentObject(const std::string &name, const char *buf, size_t len, int freshness)
211{
212 ccn_charbuf *pname = ccn_charbuf_create();
213 ccn_charbuf *signed_info = ccn_charbuf_create();
214 ccn_charbuf *content = ccn_charbuf_create();
215
216 ccn_name_from_uri(pname, name.c_str());
217 ccn_signed_info_create(signed_info,
218 getPublicKeyDigest(),
219 getPublicKeyDigestLength(),
220 NULL,
221 CCN_CONTENT_DATA,
222 freshness,
223 NULL,
224 m_keyLoactor);
225 if(ccn_encode_ContentObject(content, pname, signed_info,
226 (const unsigned char *)buf, len,
227 NULL, getPrivateKey()) < 0)
228 {
229 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("encode content failed"));
230 }
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800231
232 Bytes bytes;
233 readRaw(bytes, content->buf, content->length);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800234
235 ccn_charbuf_destroy (&pname);
236 ccn_charbuf_destroy (&signed_info);
237 ccn_charbuf_destroy (&content);
238
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800239 return bytes;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800240}
241
242int
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800243CcnxWrapper::putToCcnd (Bytes &contentObject)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800244{
245 recursive_mutex::scoped_lock lock(m_mutex);
246 if (!m_running || !m_connected)
247 return -1;
248
249
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800250 if (ccn_put(m_handle, (const unsigned char *)contentObject[0], contentObject.size()) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800251 {
252 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("ccnput failed"));
253 }
254
255 return 0;
256}
257
258int
259CcnxWrapper::publishData (const string &name, const char *buf, size_t len, int freshness)
260{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800261 Bytes co = createContentObject(name, buf, len, freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800262 return putToCcnd(co);
263}
264
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800265int
266CcnxWrapper::publishData (const string &name, const Bytes &content, int freshness)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800267{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800268 publishData(name, (const char*)content[0], content.size(), freshness);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800269}
270
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800271string
272CcnxWrapper::extractName(const unsigned char *data, ccn_indexbuf *comps)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800273{
274 ostringstream name (ostringstream::out);
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800275 for (int i = 0; i < comps->n - 1; i++)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800276 {
277 char *comp;
278 size_t size;
279 name << "/";
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800280 ccn_name_comp_get(data, comps, i, (const unsigned char **)&comp, &size);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800281 string compStr(comp, size);
282 name << compStr;
283 }
284
285 return name.str();
286}
287
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800288
289static ccn_upcall_res
290incomingInterest(ccn_closure *selfp,
291 ccn_upcall_kind kind,
292 ccn_upcall_info *info)
293{
294 CcnxWrapper::InterestCallback *f = static_cast<CcnxWrapper::InterestCallback*> (selfp->data);
295
296 switch (kind)
297 {
298 case CCN_UPCALL_FINAL: // effective in unit tests
299 delete f;
300 delete selfp;
301 return CCN_UPCALL_RESULT_OK;
302
303 case CCN_UPCALL_INTEREST:
304 break;
305
306 default:
307 return CCN_UPCALL_RESULT_OK;
308 }
309
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800310 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800311
312 (*f) (interest);
313 return CCN_UPCALL_RESULT_OK;
314}
315
316static ccn_upcall_res
317incomingData(ccn_closure *selfp,
318 ccn_upcall_kind kind,
319 ccn_upcall_info *info)
320{
321 ClosurePass *cp = static_cast<ClosurePass *> (selfp->data);
322
323 switch (kind)
324 {
325 case CCN_UPCALL_FINAL: // effecitve in unit tests
326 delete cp;
327 cp = NULL;
328 delete selfp;
329 return CCN_UPCALL_RESULT_OK;
330
331 case CCN_UPCALL_CONTENT:
332 break;
333
334 case CCN_UPCALL_INTEREST_TIMED_OUT: {
335 if (cp != NULL && cp->getRetry() > 0) {
336 cp->decRetry();
337 return CCN_UPCALL_RESULT_REEXPRESS;
338 }
339
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800340 string interest = CcnxWrapper::extractName(info->interest_ccnb, info->interest_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800341 CcnxWrapper::TimeoutCallbackReturnValue rv = cp->runTimeoutCallback(interest);
342 switch(rv)
343 {
344 case CcnxWrapper::RESULT_OK : return CCN_UPCALL_RESULT_OK;
345 case CcnxWrapper::RESULT_REEXPRESS : return CCN_UPCALL_RESULT_REEXPRESS;
346 default : break;
347 }
348 return CCN_UPCALL_RESULT_OK;
349 }
350
351 default:
352 return CCN_UPCALL_RESULT_OK;
353 }
354
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800355 const unsigned char *pcontent;
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800356 size_t len;
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800357 if (ccn_content_get_value(info->content_ccnb, info->pco->offset[CCN_PCO_E], info->pco, &pcontent, &len) < 0)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800358 {
359 // BOOST_THROW_EXCEPTION(CcnxOperationException() << errmsg_info_str("decode ContentObject failed"));
360 }
361
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800362 string name = CcnxWrapper::extractName(info->content_ccnb, info->content_comps);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800363
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800364 Bytes content;
365 // copy content and do processing on the copy
366 // otherwise the pointed memory may have been changed during the processing
367 readRaw(content, pcontent, len);
368
369 cp->runDataCallback(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800370
371 return CCN_UPCALL_RESULT_OK;
372}
373
374
375int CcnxWrapper::sendInterest (const string &strInterest, const DataCallback &dataCallback, int retry, const TimeoutCallback &timeoutCallback)
376{
377 ClosurePass * pass = new ClosurePass(retry, dataCallback, timeoutCallback);
378 sendInterest(strInterest, pass);
379}
380
381int CcnxWrapper::sendInterest (const string &strInterest, void *dataPass)
382{
383 recursive_mutex::scoped_lock lock(m_mutex);
384 if (!m_running || !m_connected)
385 return -1;
386
387 ccn_charbuf *pname = ccn_charbuf_create();
388 ccn_closure *dataClosure = new ccn_closure;
389
390 ccn_name_from_uri (pname, strInterest.c_str());
391 dataClosure->data = dataPass;
392
393 dataClosure->p = &incomingData;
394 if (ccn_express_interest (m_handle, pname, dataClosure, NULL) < 0)
395 {
396 }
397
398 ccn_charbuf_destroy (&pname);
399 return 0;
400}
401
402int CcnxWrapper::setInterestFilter (const string &prefix, const InterestCallback &interestCallback)
403{
404 recursive_mutex::scoped_lock lock(m_mutex);
405 if (!m_running || !m_connected)
406 return -1;
407
408 ccn_charbuf *pname = ccn_charbuf_create();
409 ccn_closure *interestClosure = new ccn_closure;
410
411 ccn_name_from_uri (pname, prefix.c_str());
412 interestClosure->data = new InterestCallback (interestCallback); // should be removed when closure is removed
413 interestClosure->p = &incomingInterest;
414 int ret = ccn_set_interest_filter (m_handle, pname, interestClosure);
415 if (ret < 0)
416 {
417 }
418
419 m_registeredInterests.insert(pair<std::string, InterestCallback>(prefix, interestCallback));
420 ccn_charbuf_destroy(&pname);
421}
422
423void
424CcnxWrapper::clearInterestFilter (const std::string &prefix)
425{
426 recursive_mutex::scoped_lock lock(m_mutex);
427 if (!m_running || !m_connected)
428 return;
429
430 ccn_charbuf *pname = ccn_charbuf_create();
431
432 ccn_name_from_uri (pname, prefix.c_str());
433 int ret = ccn_set_interest_filter (m_handle, pname, 0);
434 if (ret < 0)
435 {
436 }
437
438 m_registeredInterests.erase(prefix);
439 ccn_charbuf_destroy(&pname);
440}
441
442string
443CcnxWrapper::getLocalPrefix ()
444{
445 struct ccn * tmp_handle = ccn_create ();
446 int res = ccn_connect (tmp_handle, NULL);
447 if (res < 0)
448 {
449 return "";
450 }
451
452 string retval = "";
453
454 struct ccn_charbuf *templ = ccn_charbuf_create();
455 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
456 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
457 ccn_charbuf_append_closer(templ); /* </Name> */
458 // XXX - use pubid if possible
459 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
460 ccnb_append_number(templ, 1);
461 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
462 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
463 ccn_charbuf_append_closer(templ); /* </Interest> */
464
465 struct ccn_charbuf *name = ccn_charbuf_create ();
466 res = ccn_name_from_uri (name, "/local/ndn/prefix");
467 if (res < 0) {
468 }
469 else
470 {
471 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
472
473 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
474 NULL, 4, CCN_V_HIGHEST, 0);
475 if (stream == NULL) {
476 }
477 else
478 {
479 ostringstream os;
480
481 int counter = 0;
482 char buf[256];
483 while (true) {
484 res = ccn_fetch_read (stream, buf, sizeof(buf));
485
486 if (res == 0) {
487 break;
488 }
489
490 if (res > 0) {
491 os << string(buf, res);
492 } else if (res == CCN_FETCH_READ_NONE) {
493 if (counter < 2)
494 {
495 ccn_run(tmp_handle, 1000);
496 counter ++;
497 }
498 else
499 {
500 break;
501 }
502 } else if (res == CCN_FETCH_READ_END) {
503 break;
504 } else if (res == CCN_FETCH_READ_TIMEOUT) {
505 break;
506 } else {
507 break;
508 }
509 }
510 retval = os.str ();
511 stream = ccn_fetch_close(stream);
512 }
513 fetch = ccn_fetch_destroy(fetch);
514 }
515
516 ccn_charbuf_destroy (&name);
517
518 ccn_disconnect (tmp_handle);
519 ccn_destroy (&tmp_handle);
520
521 return retval;
522}
523
524
525ClosurePass::ClosurePass(int retry, const CcnxWrapper::DataCallback &dataCallback, const CcnxWrapper::TimeoutCallback &timeoutCallback)
526 : m_retry(retry), m_timeoutCallback(NULL)
527{
528 m_timeoutCallback = new CcnxWrapper::TimeoutCallback (timeoutCallback);
529 m_dataCallback = new CcnxWrapper::DataCallback (dataCallback);
530}
531
532ClosurePass::~ClosurePass ()
533{
534 delete m_dataCallback;
535 delete m_timeoutCallback;
536 m_dataCallback = NULL;
537 m_timeoutCallback = NULL;
538}
539
540CcnxWrapper::TimeoutCallbackReturnValue
541ClosurePass::runTimeoutCallback(std::string interest)
542{
543 if ((*m_timeoutCallback).empty())
544 {
545 return CcnxWrapper::RESULT_OK;
546 }
547
548 return (*m_timeoutCallback)(interest);
549}
550
551
552void
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800553ClosurePass::runDataCallback(std::string name, const Bytes &content)
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800554{
Zhenkai Zhu43eb2732012-12-28 00:48:26 -0800555 if (m_dataCallback != NULL) {
556 (*m_dataCallback)(name, content);
Zhenkai Zhu1ddeb6f2012-12-27 14:04:18 -0800557 }
558}
559
560}