blob: 4ef079e91aa92999e35e0339558a4983316de72a [file] [log] [blame]
Jeff Thompsonfa306642013-06-17 15:06:57 -07001/* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil -*- */
2/*
3 * Copyright (c) 2013, Regents of the University of California
4 * Alexander Afanasyev
5 * Zhenkai Zhu
6 *
7 * BSD license, See the LICENSE file for more information
8 *
9 * Author: Zhenkai Zhu <zhenkai@cs.ucla.edu>
10 * Alexander Afanasyev <alexander.afanasyev@ucla.edu>
11 */
12
13#include "wrapper.h"
14
15extern "C" {
16#include <ccn/fetch.h>
17}
18#include <poll.h>
19#include <boost/throw_exception.hpp>
20#include <boost/random.hpp>
21#include <boost/make_shared.hpp>
22#include <boost/algorithm/string.hpp>
23
24#include <sstream>
25
26// #include "ndn.cxx/verifier.h"
27#include "executor/executor.h"
28
29#include "logging.h"
30#include "ndn.cxx/wire/ccnb.h"
31
32
33INIT_LOGGER ("ndn.Wrapper");
34
35typedef boost::error_info<struct tag_errmsg, std::string> errmsg_info_str;
36typedef boost::error_info<struct tag_errmsg, int> errmsg_info_int;
37
38using namespace std;
39using namespace boost;
40
41namespace ndn {
42
43// hack to enable fake signatures
44// min length for signature field is 16, as defined in ccn_buf_decoder.c:728
45const int DEFAULT_SIGNATURE_SIZE = 16;
46
47// Although ccn_buf_decoder.c:745 defines minimum length 16, something else is checking and only 32-byte fake value is accepted by ccnd
48const int PUBLISHER_KEY_SIZE = 32;
49
50static int
51ccn_encode_garbage_Signature(struct ccn_charbuf *buf)
52{
53 int res = 0;
54
55 res |= ccn_charbuf_append_tt(buf, CCN_DTAG_Signature, CCN_DTAG);
56
57 // Let's cheat more. Default signing algorithm in ccnd is SHA256, so we just need add 32 bytes of garbage
58 static char garbage [DEFAULT_SIGNATURE_SIZE];
59
60 // digest and witness fields are optional, so use default ones
61
62 res |= ccn_charbuf_append_tt(buf, CCN_DTAG_SignatureBits, CCN_DTAG);
63 res |= ccn_charbuf_append_tt(buf, DEFAULT_SIGNATURE_SIZE, CCN_BLOB);
64 res |= ccn_charbuf_append(buf, garbage, DEFAULT_SIGNATURE_SIZE);
65 res |= ccn_charbuf_append_closer(buf);
66
67 res |= ccn_charbuf_append_closer(buf);
68
69 return(res == 0 ? 0 : -1);
70}
71
72static int
73ccn_pack_unsigned_ContentObject(struct ccn_charbuf *buf,
74 const struct ccn_charbuf *Name,
75 const struct ccn_charbuf *SignedInfo,
76 const void *data,
77 size_t size)
78{
79 int res = 0;
80 struct ccn_charbuf *content_header;
81 size_t closer_start;
82
83 content_header = ccn_charbuf_create();
84 res |= ccn_charbuf_append_tt(content_header, CCN_DTAG_Content, CCN_DTAG);
85 if (size != 0)
86 res |= ccn_charbuf_append_tt(content_header, size, CCN_BLOB);
87 closer_start = content_header->length;
88 res |= ccn_charbuf_append_closer(content_header);
89 if (res < 0)
90 return(-1);
91
92 res |= ccn_charbuf_append_tt(buf, CCN_DTAG_ContentObject, CCN_DTAG);
93
94 res |= ccn_encode_garbage_Signature(buf);
95
96 res |= ccn_charbuf_append_charbuf(buf, Name);
97 res |= ccn_charbuf_append_charbuf(buf, SignedInfo);
98 res |= ccnb_append_tagged_blob(buf, CCN_DTAG_Content, data, size);
99 res |= ccn_charbuf_append_closer(buf);
100
101 ccn_charbuf_destroy(&content_header);
102 return(res == 0 ? 0 : -1);
103}
104
105Wrapper::Wrapper()
106 : m_handle (0)
107 , m_running (true)
108 , m_connected (false)
109 , m_executor (new Executor(1))
110 // , m_verifier(new Verifier(this))
111{
112 start ();
113}
114
115void
116Wrapper::connectCcnd()
117{
118 if (m_handle != 0) {
119 ccn_disconnect (m_handle);
120 //ccn_destroy (&m_handle);
121 }
122 else
123 {
124 m_handle = ccn_create ();
125 }
126
127 UniqueRecLock lock(m_mutex);
128 if (ccn_connect(m_handle, NULL) < 0)
129 {
130 BOOST_THROW_EXCEPTION (Error::ndnOperation() << errmsg_info_str("connection to ccnd failed"));
131 }
132 m_connected = true;
133
134 if (!m_registeredInterests.empty())
135 {
136 for (map<Name, InterestCallback>::const_iterator it = m_registeredInterests.begin(); it != m_registeredInterests.end(); ++it)
137 {
138 clearInterestFilter(it->first, false);
139 setInterestFilter(it->first, it->second, false);
140 }
141 }
142}
143
144Wrapper::~Wrapper()
145{
146 shutdown ();
147 // if (m_verifier != 0)
148 // {
149 // delete m_verifier;
150 // m_verifier = 0;
151 // }
152}
153
154void
155Wrapper::start () // called automatically in constructor
156{
157 connectCcnd();
158 m_thread = thread (&Wrapper::ccnLoop, this);
159 m_executor->start();
160}
161
162void
163Wrapper::shutdown () // called in destructor, but can called manually
164{
165 m_executor->shutdown();
166
167 {
168 UniqueRecLock lock(m_mutex);
169 m_running = false;
170 }
171
172 _LOG_DEBUG ("+++++++++SHUTDOWN+++++++");
173 if (m_connected)
174 {
175 m_thread.join ();
176
177 ccn_disconnect (m_handle);
178 //ccn_destroy (&m_handle);
179 m_connected = false;
180 }
181}
182
183void
184Wrapper::ccnLoop ()
185{
186 static boost::mt19937 randomGenerator (static_cast<unsigned int> (std::time (0)));
187 static boost::variate_generator<boost::mt19937&, boost::uniform_int<> > rangeUniformRandom (randomGenerator, uniform_int<> (0,1000));
188
189 while (m_running)
190 {
191 try
192 {
193 int res = 0;
194 {
195 UniqueRecLock lock(m_mutex);
196 res = ccn_run (m_handle, 0);
197 }
198
199 if (!m_running) break;
200
201 if (res < 0) {
202 _LOG_ERROR ("ccn_run returned negative status: " << res);
203
204 BOOST_THROW_EXCEPTION (Error::ndnOperation()
205 << errmsg_info_str("ccn_run returned error"));
206 }
207
208
209 pollfd pfds[1];
210 {
211 UniqueRecLock lock(m_mutex);
212
213 pfds[0].fd = ccn_get_connection_fd (m_handle);
214 pfds[0].events = POLLIN;
215 if (ccn_output_is_pending (m_handle))
216 pfds[0].events |= POLLOUT;
217 }
218
219 int ret = poll (pfds, 1, 1);
220 if (ret < 0)
221 {
222 BOOST_THROW_EXCEPTION (Error::ndnOperation() << errmsg_info_str("ccnd socket failed (probably ccnd got stopped)"));
223 }
224 }
225 catch (Error::ndnOperation &e)
226 {
227 m_connected = false;
228 // probably ccnd has been stopped
229 // try reconnect with sleep
230 int interval = 1;
231 int maxInterval = 32;
232 while (m_running)
233 {
234 try
235 {
236 this_thread::sleep (boost::get_system_time () + time::Seconds (interval) + time::Milliseconds (rangeUniformRandom ()));
237
238 connectCcnd ();
239 _LOG_DEBUG("reconnect to ccnd succeeded");
240 break;
241 }
242 catch (Error::ndnOperation &e)
243 {
244 this_thread::sleep (boost::get_system_time () + time::Seconds (interval) + time::Milliseconds (rangeUniformRandom ()));
245
246 // do exponential backup for reconnect interval
247 if (interval < maxInterval)
248 {
249 interval *= 2;
250 }
251 }
252 }
253 }
254 catch (const std::exception &exc)
255 {
256 // catch anything thrown within try block that derives from std::exception
257 std::cerr << exc.what();
258 }
259 catch (...)
260 {
261 cout << "UNKNOWN EXCEPTION !!!" << endl;
262 }
263 }
264}
265
266Bytes
267Wrapper::createContentObject(const Name &name, const void *buf, size_t len, int freshness, const Name &keyNameParam)
268{
269 {
270 UniqueRecLock lock(m_mutex);
271 if (!m_running || !m_connected)
272 {
273 _LOG_TRACE ("<< not running or connected");
274 return Bytes ();
275 }
276 }
277
278 ccn_charbuf *content = ccn_charbuf_create();
279
280 struct ccn_signing_params sp = CCN_SIGNING_PARAMS_INIT;
281 sp.freshness = freshness;
282
283 Name keyName;
284
285 if (keyNameParam.size() == 0)
286 {
287 // use default key name
288 CharbufPtr defaultKeyNamePtr = boost::make_shared<Charbuf>();
289 ccn_get_public_key_and_name(m_handle, &sp, NULL, NULL, defaultKeyNamePtr->getBuf());
290 keyName = Name(*defaultKeyNamePtr);
291
292 _LOG_DEBUG ("DEFAULT KEY NAME: " << keyName);
293 }
294 else
295 {
296 keyName = keyNameParam;
297 }
298
299 if (sp.template_ccnb == NULL)
300 {
301 sp.template_ccnb = ccn_charbuf_create();
302 ccn_charbuf_append_tt(sp.template_ccnb, CCN_DTAG_SignedInfo, CCN_DTAG);
303 }
304 // no idea what the following 3 lines do, but it was there
305 else if (sp.template_ccnb->length > 0) {
306 sp.template_ccnb->length--;
307 }
308 ccn_charbuf_append_tt(sp.template_ccnb, CCN_DTAG_KeyLocator, CCN_DTAG);
309 ccn_charbuf_append_tt(sp.template_ccnb, CCN_DTAG_KeyName, CCN_DTAG);
310
311 charbuf_stream keyStream;
312 wire::Ccnb::appendName (keyStream, keyName);
313
314 ccn_charbuf_append(sp.template_ccnb, keyStream.buf ().getBuf ()->buf, keyStream.buf ().getBuf ()->length);
315 ccn_charbuf_append_closer(sp.template_ccnb); // </KeyName>
316 ccn_charbuf_append_closer(sp.template_ccnb); // </KeyLocator>
317 sp.sp_flags |= CCN_SP_TEMPL_KEY_LOCATOR;
318 ccn_charbuf_append_closer(sp.template_ccnb); // </SignedInfo>
319
320 charbuf_stream nameStream;
321 wire::Ccnb::appendName (nameStream, name);
322
323 if (ccn_sign_content(m_handle, content, nameStream.buf ().getBuf (), &sp, buf, len) != 0)
324 {
325 BOOST_THROW_EXCEPTION(Error::ndnOperation() << errmsg_info_str("sign content failed"));
326 }
327
328 Bytes bytes;
329 readRaw(bytes, content->buf, content->length);
330
331 ccn_charbuf_destroy (&content);
332 if (sp.template_ccnb != NULL)
333 {
334 ccn_charbuf_destroy (&sp.template_ccnb);
335 }
336
337 return bytes;
338}
339
340int
341Wrapper::putToCcnd (const Bytes &contentObject)
342{
343 _LOG_TRACE (">> putToCcnd");
344 UniqueRecLock lock(m_mutex);
345 if (!m_running || !m_connected)
346 {
347 _LOG_TRACE ("<< not running or connected");
348 return -1;
349 }
350
351
352 if (ccn_put(m_handle, head(contentObject), contentObject.size()) < 0)
353 {
354 _LOG_ERROR ("ccn_put failed");
355 // BOOST_THROW_EXCEPTION(Error::ndnOperation() << errmsg_info_str("ccnput failed"));
356 }
357 else
358 {
359 _LOG_DEBUG ("<< putToCcnd");
360 }
361
362 return 0;
363}
364
365int
366Wrapper::publishData (const Name &name, const unsigned char *buf, size_t len, int freshness, const Name &keyName)
367{
368 _LOG_TRACE ("publishData: " << name);
369 Bytes co = createContentObject(name, buf, len, freshness, keyName);
370 return putToCcnd(co);
371}
372
373int
374Wrapper::publishUnsignedData(const Name &name, const unsigned char *buf, size_t len, int freshness)
375{
376 _LOG_TRACE ("publishUnsignedData: " << name);
377 {
378 UniqueRecLock lock(m_mutex);
379 if (!m_running || !m_connected)
380 {
381 _LOG_TRACE ("<< not running or connected");
382 return -1;
383 }
384 }
385
386 ccn_charbuf *content = ccn_charbuf_create();
387 ccn_charbuf *signed_info = ccn_charbuf_create();
388
389 static char fakeKey[PUBLISHER_KEY_SIZE];
390
391 int res = ccn_signed_info_create(signed_info,
392 fakeKey, PUBLISHER_KEY_SIZE,
393 NULL,
394 CCN_CONTENT_DATA,
395 freshness,
396 NULL,
397 NULL // ccnd is happy with absent key locator and key itself... ha ha
398 );
399
400 charbuf_stream nameStream;
401 wire::Ccnb::appendName (nameStream, name);
402
403 ccn_pack_unsigned_ContentObject(content, nameStream.buf ().getBuf (), signed_info, buf, len);
404
405 Bytes bytes;
406 readRaw(bytes, content->buf, content->length);
407
408 ccn_charbuf_destroy (&content);
409 ccn_charbuf_destroy (&signed_info);
410
411 return putToCcnd (bytes);
412}
413
414
415static void
416deleterInInterestTuple (tuple<Wrapper::InterestCallback *, ExecutorPtr> *tuple)
417{
418 delete tuple->get<0> ();
419 delete tuple;
420}
421
422static ccn_upcall_res
423incomingInterest(ccn_closure *selfp,
424 ccn_upcall_kind kind,
425 ccn_upcall_info *info)
426{
427 Wrapper::InterestCallback *f;
428 ExecutorPtr executor;
429 tuple<Wrapper::InterestCallback *, ExecutorPtr> *realData = reinterpret_cast< tuple<Wrapper::InterestCallback *, ExecutorPtr>* > (selfp->data);
430 tie (f, executor) = *realData;
431
432 switch (kind)
433 {
434 case CCN_UPCALL_FINAL: // effective in unit tests
435 // delete closure;
436 executor->execute (bind (deleterInInterestTuple, realData));
437
438 delete selfp;
439 _LOG_TRACE ("<< incomingInterest with CCN_UPCALL_FINAL");
440 return CCN_UPCALL_RESULT_OK;
441
442 case CCN_UPCALL_INTEREST:
443 _LOG_TRACE (">> incomingInterest upcall: " << Name(info->interest_ccnb, info->interest_comps));
444 break;
445
446 default:
447 _LOG_TRACE ("<< incomingInterest with CCN_UPCALL_RESULT_OK: " << Name(info->interest_ccnb, info->interest_comps));
448 return CCN_UPCALL_RESULT_OK;
449 }
450
451 InterestPtr interest = make_shared<Interest> (info->pi);
452 interest->setName (Name (info->interest_ccnb, info->interest_comps));
453
454 executor->execute (bind (*f, interest));
455 // this will be run in executor
456 // (*f) (interest);
457 // closure->runInterestCallback(interest);
458
459 return CCN_UPCALL_RESULT_OK;
460}
461
462static void
463deleterInDataTuple (tuple<Closure *, ExecutorPtr, InterestPtr> *tuple)
464{
465 delete tuple->get<0> ();
466 delete tuple;
467}
468
469static ccn_upcall_res
470incomingData(ccn_closure *selfp,
471 ccn_upcall_kind kind,
472 ccn_upcall_info *info)
473{
474 // Closure *cp = static_cast<Closure *> (selfp->data);
475 Closure *cp;
476 ExecutorPtr executor;
477 InterestPtr interest;
478 tuple<Closure *, ExecutorPtr, InterestPtr> *realData = reinterpret_cast< tuple<Closure*, ExecutorPtr, InterestPtr>* > (selfp->data);
479 tie (cp, executor, interest) = *realData;
480
481 switch (kind)
482 {
483 case CCN_UPCALL_FINAL: // effecitve in unit tests
484 executor->execute (bind (deleterInDataTuple, realData));
485
486 cp = NULL;
487 delete selfp;
488 _LOG_TRACE ("<< incomingData with CCN_UPCALL_FINAL");
489 return CCN_UPCALL_RESULT_OK;
490
491 case CCN_UPCALL_CONTENT:
492 _LOG_TRACE (">> incomingData content upcall: " << Name (info->content_ccnb, info->content_comps));
493 break;
494
495 // this is the case where the intentionally unsigned packets coming (in Encapsulation case)
496 case CCN_UPCALL_CONTENT_BAD:
497 _LOG_TRACE (">> incomingData content bad upcall: " << Name (info->content_ccnb, info->content_comps));
498 break;
499
500 // always ask ccnd to try to fetch the key
501 case CCN_UPCALL_CONTENT_UNVERIFIED:
502 _LOG_TRACE (">> incomingData content unverified upcall: " << Name (info->content_ccnb, info->content_comps));
503 break;
504
505 case CCN_UPCALL_INTEREST_TIMED_OUT: {
506 if (cp != NULL)
507 {
508 Name interestName (info->interest_ccnb, info->interest_comps);
509 _LOG_TRACE ("<< incomingData timeout: " << Name (info->interest_ccnb, info->interest_comps));
510 executor->execute (bind (&Closure::runTimeoutCallback, cp, interestName, *cp, interest));
511 }
512 else
513 {
514 _LOG_TRACE ("<< incomingData timeout, but callback is not set...: " << Name (info->interest_ccnb, info->interest_comps));
515 }
516 return CCN_UPCALL_RESULT_OK;
517 }
518
519 default:
520 _LOG_TRACE(">> unknown upcall type");
521 return CCN_UPCALL_RESULT_OK;
522 }
523
524 PcoPtr pco = make_shared<ParsedContentObject> (info->content_ccnb, info->pco->offset[CCN_PCO_E]);
525
526 // this will be run in executor
527 executor->execute (bind (&Closure::runDataCallback, cp, pco->name (), pco));
528 _LOG_TRACE (">> incomingData");
529
530 return CCN_UPCALL_RESULT_OK;
531}
532
533int Wrapper::sendInterest (const Interest &interest, const Closure &closure)
534{
535 _LOG_TRACE (">> sendInterest: " << interest.getName ());
536 {
537 UniqueRecLock lock(m_mutex);
538 if (!m_running || !m_connected)
539 {
540 _LOG_ERROR ("<< sendInterest: not running or connected");
541 return -1;
542 }
543 }
544
545 ccn_closure *dataClosure = new ccn_closure;
546
547 // Closure *myClosure = new ExecutorClosure(closure, m_executor);
548 Closure *myClosure = closure.dup ();
549 dataClosure->data = new tuple<Closure*, ExecutorPtr, InterestPtr> (myClosure, m_executor, make_shared<Interest> (interest));
550
551 dataClosure->p = &incomingData;
552
553 UniqueRecLock lock(m_mutex);
554
555 charbuf_stream nameStream;
556 wire::Ccnb::appendName (nameStream, interest.getName ());
557
558 charbuf_stream interestStream;
559 wire::Ccnb::appendInterest (interestStream, interest);
560
561 if (ccn_express_interest (m_handle, nameStream.buf ().getBuf (),
562 dataClosure,
563 interestStream.buf ().getBuf ()
564 ) < 0)
565 {
566 _LOG_ERROR ("<< sendInterest: ccn_express_interest FAILED!!!");
567 }
568
569 return 0;
570}
571
572int Wrapper::setInterestFilter (const Name &prefix, const InterestCallback &interestCallback, bool record/* = true*/)
573{
574 _LOG_TRACE (">> setInterestFilter");
575 UniqueRecLock lock(m_mutex);
576 if (!m_running || !m_connected)
577 {
578 return -1;
579 }
580
581 ccn_closure *interestClosure = new ccn_closure;
582
583 // interestClosure->data = new ExecutorInterestClosure(interestCallback, m_executor);
584
585 interestClosure->data = new tuple<Wrapper::InterestCallback *, ExecutorPtr> (new InterestCallback (interestCallback), m_executor); // should be removed when closure is removed
586 interestClosure->p = &incomingInterest;
587
588 charbuf_stream prefixStream;
589 wire::Ccnb::appendName (prefixStream, prefix);
590
591 int ret = ccn_set_interest_filter (m_handle, prefixStream.buf ().getBuf (), interestClosure);
592 if (ret < 0)
593 {
594 _LOG_ERROR ("<< setInterestFilter: ccn_set_interest_filter FAILED");
595 }
596
597 if (record)
598 {
599 m_registeredInterests.insert(pair<Name, InterestCallback>(prefix, interestCallback));
600 }
601
602 _LOG_TRACE ("<< setInterestFilter");
603
604 return ret;
605}
606
607void
608Wrapper::clearInterestFilter (const Name &prefix, bool record/* = true*/)
609{
610 _LOG_TRACE (">> clearInterestFilter");
611 UniqueRecLock lock(m_mutex);
612 if (!m_running || !m_connected)
613 return;
614
615 charbuf_stream prefixStream;
616 wire::Ccnb::appendName (prefixStream, prefix);
617
618 int ret = ccn_set_interest_filter (m_handle, prefixStream.buf ().getBuf (), 0);
619 if (ret < 0)
620 {
621 }
622
623 if (record)
624 {
625 m_registeredInterests.erase(prefix);
626 }
627
628 _LOG_TRACE ("<< clearInterestFilter");
629}
630
631Name
632Wrapper::getLocalPrefix ()
633{
634 struct ccn * tmp_handle = ccn_create ();
635 int res = ccn_connect (tmp_handle, NULL);
636 if (res < 0)
637 {
638 return Name();
639 }
640
641 string retval = "";
642
643 struct ccn_charbuf *templ = ccn_charbuf_create();
644 ccn_charbuf_append_tt(templ, CCN_DTAG_Interest, CCN_DTAG);
645 ccn_charbuf_append_tt(templ, CCN_DTAG_Name, CCN_DTAG);
646 ccn_charbuf_append_closer(templ); /* </Name> */
647 // XXX - use pubid if possible
648 ccn_charbuf_append_tt(templ, CCN_DTAG_MaxSuffixComponents, CCN_DTAG);
649 ccnb_append_number(templ, 1);
650 ccn_charbuf_append_closer(templ); /* </MaxSuffixComponents> */
651 ccnb_tagged_putf(templ, CCN_DTAG_Scope, "%d", 2);
652 ccn_charbuf_append_closer(templ); /* </Interest> */
653
654 struct ccn_charbuf *name = ccn_charbuf_create ();
655 res = ccn_name_from_uri (name, "/local/ndn/prefix");
656 if (res < 0) {
657 }
658 else
659 {
660 struct ccn_fetch *fetch = ccn_fetch_new (tmp_handle);
661
662 struct ccn_fetch_stream *stream = ccn_fetch_open (fetch, name, "/local/ndn/prefix",
663 NULL, 4, CCN_V_HIGHEST, 0);
664 if (stream == NULL) {
665 }
666 else
667 {
668 ostringstream os;
669
670 int counter = 0;
671 char buf[256];
672 while (true) {
673 res = ccn_fetch_read (stream, buf, sizeof(buf));
674
675 if (res == 0) {
676 break;
677 }
678
679 if (res > 0) {
680 os << string(buf, res);
681 } else if (res == CCN_FETCH_READ_NONE) {
682 if (counter < 2)
683 {
684 ccn_run(tmp_handle, 1000);
685 counter ++;
686 }
687 else
688 {
689 break;
690 }
691 } else if (res == CCN_FETCH_READ_END) {
692 break;
693 } else if (res == CCN_FETCH_READ_TIMEOUT) {
694 break;
695 } else {
696 break;
697 }
698 }
699 retval = os.str ();
700 stream = ccn_fetch_close(stream);
701 }
702 fetch = ccn_fetch_destroy(fetch);
703 }
704
705 ccn_charbuf_destroy (&name);
706
707 ccn_disconnect (tmp_handle);
708 ccn_destroy (&tmp_handle);
709
710 boost::algorithm::trim(retval);
711 return Name(retval);
712}
713
714bool
715Wrapper::verify(PcoPtr &pco, double maxWait)
716{
717 return true; // totally fake
718 // return m_verifier->verify(pco, maxWait);
719}
720
721/// @cond include_hidden
722// This is needed just for get function implementation
723struct GetState
724{
725 GetState (double maxWait)
726 {
727 double intPart, fraction;
728 fraction = modf (std::abs(maxWait), &intPart);
729
730 m_maxWait = time::Now ()
731 + time::Seconds (intPart)
732 + time::Microseconds (fraction * 1000000);
733 }
734
735 PcoPtr
736 WaitForResult ()
737 {
738 //_LOG_TRACE("GetState::WaitForResult start");
739 boost::unique_lock<boost::mutex> lock (m_mutex);
740 m_cond.timed_wait (lock, m_maxWait);
741 //_LOG_TRACE("GetState::WaitForResult finish");
742
743 return m_retval;
744 }
745
746 void
747 DataCallback (Name name, PcoPtr pco)
748 {
749 //_LOG_TRACE("GetState::DataCallback, Name [" << name << "]");
750 boost::unique_lock<boost::mutex> lock (m_mutex);
751 m_retval = pco;
752 m_cond.notify_one ();
753 }
754
755 void
756 TimeoutCallback (Name name)
757 {
758 boost::unique_lock<boost::mutex> lock (m_mutex);
759 m_cond.notify_one ();
760 }
761
762private:
763 Time m_maxWait;
764
765 boost::mutex m_mutex;
766 boost::condition_variable m_cond;
767
768 PcoPtr m_retval;
769};
770/// @endcond
771
772PcoPtr
773Wrapper::get(const Interest &interest, double maxWait/* = 4.0*/)
774{
775 _LOG_TRACE (">> get: " << interest.getName ());
776 {
777 UniqueRecLock lock(m_mutex);
778 if (!m_running || !m_connected)
779 {
780 _LOG_ERROR ("<< get: not running or connected");
781 return PcoPtr ();
782 }
783 }
784
785 GetState state (maxWait);
786 this->sendInterest (interest, Closure (boost::bind (&GetState::DataCallback, &state, _1, _2),
787 boost::bind (&GetState::TimeoutCallback, &state, _1)));
788 return state.WaitForResult ();
789}
790
791}