face+node: Improving async operations
Now expressInterest is executed strictly in io_service thread, which
should guarantee absence of resource contention, provided that
io_service has exactly one thread.
Instead of numerical IDs for expressed Interests and registered
prefixes, an opaque PendingInterestId and RegisteredPrefixId classed are
used. These classes are basically reinterpret_cast pointers to the
PendingInterest and RegisteredPrefix objects stored on the node.
refs #1142 (http://redmine.named-data.net/issues/1142)
Change-Id: I4b146ee34b98bfa6564935d5f2fe33056a402694
diff --git a/include/ndn-cpp-dev/node.hpp b/include/ndn-cpp-dev/node.hpp
index b5e4e97..7892075 100644
--- a/include/ndn-cpp-dev/node.hpp
+++ b/include/ndn-cpp-dev/node.hpp
@@ -14,9 +14,11 @@
#include "forwarding-flags.hpp"
#include "transport/transport.hpp"
-
namespace ndn {
+struct PendingInterestId;
+struct RegisteredPrefixId;
+
/**
* An OnData function object is used to pass a callback to expressInterest.
*/
@@ -31,7 +33,7 @@
* An OnInterest function object is used to pass a callback to registerPrefix.
*/
typedef func_lib::function<void
- (const ptr_lib::shared_ptr<const Name>&, const ptr_lib::shared_ptr<const Interest>&, Transport&, uint64_t)> OnInterest;
+ (const ptr_lib::shared_ptr<const Name>&, const ptr_lib::shared_ptr<const Interest>&)> OnInterest;
/**
* An OnRegisterFailed function object is used to report when registerPrefix fails.
@@ -75,7 +77,7 @@
* @param wireFormat A WireFormat object used to encode the message.
* @return The pending interest ID which can be used with removePendingInterest.
*/
- uint64_t
+ const PendingInterestId*
expressInterest(const Interest& interest, const OnData& onData, const OnTimeout& onTimeout);
/**
@@ -85,7 +87,7 @@
* @param pendingInterestId The ID returned from expressInterest.
*/
void
- removePendingInterest(uint64_t pendingInterestId);
+ removePendingInterest(const PendingInterestId *pendingInterestId);
/**
* Register prefix with the connected NDN hub and call onInterest when a matching interest is received.
@@ -98,7 +100,7 @@
* @param wireFormat A WireFormat object used to encode the message.
* @return The registered prefix ID which can be used with removeRegisteredPrefix.
*/
- uint64_t
+ const RegisteredPrefixId*
registerPrefix
(const Name& prefix, const OnInterest& onInterest, const OnRegisterFailed& onRegisterFailed, const ForwardingFlags& flags);
@@ -109,7 +111,7 @@
* @param registeredPrefixId The ID returned from registerPrefix.
*/
void
- removeRegisteredPrefix(uint64_t registeredPrefixId);
+ removeRegisteredPrefix(const RegisteredPrefixId *registeredPrefixId);
/**
* @brief Publish data packet
@@ -146,6 +148,13 @@
shutdown();
private:
+ void
+ asyncExpressInterest(const ptr_lib::shared_ptr<const Interest> &interest,
+ const OnData& onData, const OnTimeout& onTimeout);
+
+ void
+ asyncRemovePendingInterest(const PendingInterestId *pendingInterestId);
+
void
onReceiveElement(const Block &wire);
@@ -158,30 +167,14 @@
public:
/**
* Create a new PitEntry and set the timeoutTime_ based on the current time and the interest lifetime.
- * @param pendingInterestId A unique ID for this entry, which you should get with getNextPendingInteresId().
* @param interest A shared_ptr for the interest.
* @param onData A function object to call when a matching data packet is received.
* @param onTimeout A function object to call if the interest times out. If onTimeout is an empty OnTimeout(), this does not use it.
*/
PendingInterest
- (uint64_t pendingInterestId, const ptr_lib::shared_ptr<const Interest>& interest, const OnData& onData,
+ (const ptr_lib::shared_ptr<const Interest>& interest, const OnData& onData,
const OnTimeout& onTimeout);
- /**
- * Return the next unique pending interest ID.
- */
- static uint64_t
- getNextPendingInterestId()
- {
- return ++lastPendingInterestId_;
- }
-
- /**
- * Return the pendingInterestId given to the constructor.
- */
- uint64_t
- getPendingInterestId() { return pendingInterestId_; }
-
const ptr_lib::shared_ptr<const Interest>&
getInterest() { return interest_; }
@@ -206,9 +199,6 @@
callTimeout();
private:
- static uint64_t lastPendingInterestId_; /**< A class variable used to get the next unique ID. */
-
- uint64_t pendingInterestId_; /**< A unique identifier for this entry so it can be deleted */
ptr_lib::shared_ptr<const Interest> interest_;
const OnData onData_;
const OnTimeout onTimeout_;
@@ -216,58 +206,71 @@
MillisecondsSince1970 timeoutTimeMilliseconds_; /**< The time when the interest times out in milliseconds according to ndn_getNowMilliseconds, or -1 for no timeout. */
};
+ // Functor to match pending interests against PendingInterestId
+ struct MatchPendingInterestId
+ {
+ MatchPendingInterestId(const PendingInterestId *pendingInterestId)
+ : id_(pendingInterestId)
+ {
+ }
+
+ bool
+ operator()(const ptr_lib::shared_ptr<const PendingInterest> &pendingInterest) const
+ {
+ return (reinterpret_cast<const PendingInterestId *>(pendingInterest.get()) == id_);
+ }
+ private:
+ const PendingInterestId *id_;
+ };
+
+
class RegisteredPrefix {
public:
/**
* Create a new PrefixEntry.
- * @param registeredPrefixId A unique ID for this entry, which you should get with getNextRegisteredPrefixId().
* @param prefix A shared_ptr for the prefix.
* @param onInterest A function object to call when a matching data packet is received.
*/
- RegisteredPrefix(uint64_t registeredPrefixId, const ptr_lib::shared_ptr<const Name>& prefix, const OnInterest& onInterest)
- : registeredPrefixId_(registeredPrefixId)
- , prefix_(prefix)
+ RegisteredPrefix(const Name& prefix, const OnInterest& onInterest)
+ : prefix_(new Name(prefix))
, onInterest_(onInterest)
{
}
- /**
- * Return the next unique entry ID.
- */
- static uint64_t
- getNextRegisteredPrefixId()
+ const Name&
+ getPrefix() const
{
- return ++lastRegisteredPrefixId_;
- }
-
- /**
- * Return the registeredPrefixId given to the constructor.
- */
- uint64_t
- getRegisteredPrefixId()
- {
- return registeredPrefixId_;
- }
-
- const ptr_lib::shared_ptr<const Name>&
- getPrefix()
- {
- return prefix_;
+ return *prefix_;
}
const OnInterest&
- getOnInterest()
+ getOnInterest() const
{
return onInterest_;
}
private:
- static uint64_t lastRegisteredPrefixId_; /**< A class variable used to get the next unique ID. */
-
- uint64_t registeredPrefixId_; /**< A unique identifier for this entry so it can be deleted */
- ptr_lib::shared_ptr<const Name> prefix_;
+ ptr_lib::shared_ptr<Name> prefix_;
const OnInterest onInterest_;
};
+
+ // Functor to match pending interests against PendingInterestId
+ struct MatchRegisteredPrefixId
+ {
+ MatchRegisteredPrefixId(const RegisteredPrefixId *registeredPrefixId)
+ : id_(registeredPrefixId)
+ {
+ }
+
+ bool
+ operator()(const ptr_lib::shared_ptr<RegisteredPrefix> ®isteredPrefix) const
+ {
+ return (reinterpret_cast<const RegisteredPrefixId *>(registeredPrefix.get()) == id_);
+ }
+ private:
+ const RegisteredPrefixId *id_;
+ };
+
typedef std::vector<ptr_lib::shared_ptr<PendingInterest> > PendingInterestTable;
typedef std::vector<ptr_lib::shared_ptr<RegisteredPrefix> > RegisteredPrefixTable;
@@ -300,7 +303,7 @@
*/
void
registerPrefixHelper
- (uint64_t registeredPrefixId, const ptr_lib::shared_ptr<const Name>& prefix, const OnInterest& onInterest,
+ (const ptr_lib::shared_ptr<RegisteredPrefix> &prefixToRegister,
const OnRegisterFailed& onRegisterFailed, const ForwardingFlags& flags);
/**
@@ -309,9 +312,7 @@
* This method actually sets entry in a local interest filter table
*/
void
- registerPrefixFinal(uint64_t registeredPrefixId,
- const ptr_lib::shared_ptr<const Name>& prefix,
- const OnInterest& onInterest,
+ registerPrefixFinal(const ptr_lib::shared_ptr<RegisteredPrefix> &prefixToRegister,
const OnRegisterFailed& onRegisterFailed,
const ptr_lib::shared_ptr<const Interest>&, const ptr_lib::shared_ptr<Data>&);